import errno
import json
import time
import os
from filelock import FileLock
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from future.utils import raise_from
import requests_gssapi
from ..requests import Request, munchify, requests, handle_errors
from ..helpers import for_all_methods, bind_proxy, config_from_file
from ..exceptions import CoprAuthException
[docs]@for_all_methods(bind_proxy)
class BaseProxy(object):
"""
Parent class for all other proxies
"""
def __init__(self, config):
self.config = config
self._auth_token_cached = None
self._auth_username = None
self.request = Request(api_base_url=self.api_base_url, connection_attempts=config.get("connection_attempts", 1))
@classmethod
def create_from_config_file(cls, path=None):
config = config_from_file(path)
return cls(config)
@property
def api_base_url(self):
return os.path.join(self.config["copr_url"], "api_3", "")
@property
def auth(self):
if self._auth_token_cached:
return self._auth_token_cached
if self.config.get("token"):
self._auth_token_cached = self.config["login"], self.config["token"]
self._auth_username = self.config.get("username")
elif self.config.get("gssapi"):
session_data = self._get_session_cookie_via_gssapi()
self._auth_token_cached = session_data["session"]
self._auth_username = session_data["name"]
else:
msg = "GSSAPI disabled and login:token is invalid ({0}:{1})".format(
self.config.get("login", "NOT_SET"),
self.config.get("token", "NOT_SET"),
)
raise CoprAuthException(msg)
return self._auth_token_cached
def _get_session_cookie_via_gssapi(self):
"""
Return the cached session for the configured username. If not already
cached, new self.get_session_via_gssapi() is performed and result is
cached into ~/.config/copr/<session_file>.
"""
url = urlparse(self.config["copr_url"]).netloc
cachedir = os.path.join(os.path.expanduser("~"), ".cache", "copr")
try:
os.makedirs(cachedir)
except OSError as err:
if err.errno != errno.EEXIST:
raise
session_file = os.path.join(cachedir, url + "-session")
session_data = self._load_or_download_session(session_file)
return session_data
@staticmethod
def _load_session_from_file(session_file):
session_data = None
if os.path.exists(session_file):
with open(session_file, "r") as file:
session_data = json.load(file)
if session_data and session_data["expiration"] > time.time():
return session_data
return None
def _load_or_download_session(self, session_file):
lock = FileLock(session_file + ".lock")
with lock:
session = BaseProxy._load_session_from_file(session_file)
if session:
return session
# TODO: create Munch sub-class that returns serializable dict, we
# have something like that in Cli: cli/copr_cli/util.py:serializable()
session_data = self.get_session_via_gssapi()
session_data = session_data.__dict__
session_data.pop("__response__", None)
session_data.pop("__proxy__", None)
BaseProxy._save_session(session_file, session_data)
return session_data
@staticmethod
def _save_session(session_file, session_data):
with open(session_file, "w") as file:
session_data["expiration"] = time.time() + 10 * 3600 # +10 hours
file.write(json.dumps(session_data, indent=4) + "\n")
[docs] def get_session_via_gssapi(self):
"""
Obtain a _new_ session using GSSAPI route
:return: Munch, provides user's "id", "name", "session" cookie, and
"expiration".
"""
url = self.config["copr_url"] + "/api_3/gssapi_login/"
session = requests.Session()
auth = requests_gssapi.HTTPSPNEGOAuth(opportunistic_auth=True)
try:
response = session.get(url, auth=auth)
except requests_gssapi.exceptions.SPNEGOExchangeError as err:
msg = "Can not get session for {0} cookie via GSSAPI: {1}".format(
self.config["copr_url"], err)
raise_from(CoprAuthException(msg), err)
handle_errors(response)
retval = munchify(response)
retval.session = response.cookies.get("session")
return retval
[docs] def home(self):
"""
Call the Copr APIv3 base URL
:return: Munch
"""
endpoint = ""
response = self.request.send(endpoint=endpoint)
return munchify(response)
[docs] def auth_check(self):
"""
Call an endpoint protected by login to check whether the user auth key is valid
:return: Munch
"""
endpoint = "/auth-check"
response = self.request.send(endpoint=endpoint, auth=self.auth)
return munchify(response)
[docs] def auth_username(self):
"""
Return the username (string) assigned to this configuration. May
contact the server and authenticate if needed.
"""
if not self._auth_username:
# perform authentication as a side effect
_ = self.auth
return self._auth_username