Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import base64 
  2  import datetime 
  3  import functools 
  4  from functools import wraps, partial 
  5   
  6  from netaddr import IPAddress, IPNetwork 
  7  import re 
  8  import flask 
  9   
 10  from openid_teams.teams import TeamsRequest 
 11   
 12  from coprs import app 
 13  from coprs import db 
 14  from coprs import helpers 
 15  from coprs import models 
 16  from coprs import oid 
 17  from coprs.logic.complex_logic import ComplexLogic 
 18  from coprs.logic.users_logic import UsersLogic 
 19  from coprs.logic.coprs_logic import CoprsLogic 
20 21 22 -def fed_openidize_name(name):
23 """ 24 Create proper Fedora OpenID name from short name. 25 26 >>> fedoraoid == fed_openidize_name(user.name) 27 True 28 """ 29 30 return "http://{0}.id.fedoraproject.org/".format(name)
31
32 33 -def create_user_wrapper(username, email, timezone=None):
34 expiration_date_token = datetime.date.today() + \ 35 datetime.timedelta( 36 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 37 38 copr64 = base64.b64encode(b"copr") + b"##" 39 user = models.User(username=username, mail=email, 40 timezone=timezone, 41 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 42 app.config["API_TOKEN_LENGTH"] - len(copr64)), 43 api_token=helpers.generate_api_token( 44 app.config["API_TOKEN_LENGTH"]), 45 api_token_expiration=expiration_date_token) 46 return user
47
48 49 -def fed_raw_name(oidname):
50 return oidname.replace(".id.fedoraproject.org/", "") \ 51 .replace("http://", "")
52
53 54 -def krb_straighten_username(krb_remote_user):
55 # Input should look like 'USERNAME@REALM.TLD', strip realm. 56 username = re.sub(r'@.*', '', krb_remote_user) 57 58 # But USERNAME part can consist of USER/DOMAIN.TLD. 59 # TODO: Do we need more clever thing here? 60 username = re.sub('/', '_', username) 61 62 # Based on restrictions for project name: "letters, digits, underscores, 63 # dashes and dots", it is worth limitting the username here, too. 64 # TODO: Store this pattern on one place. 65 return username if re.match(r"^[\w.-]+$", username) else None
66
67 68 @app.before_request 69 -def set_empty_user():
70 flask.g.user = None
71
72 73 @app.before_request 74 -def lookup_current_user():
75 flask.g.user = username = None 76 if "openid" in flask.session: 77 username = fed_raw_name(flask.session["openid"]) 78 elif "krb5_login" in flask.session: 79 username = flask.session["krb5_login"] 80 81 if username: 82 flask.g.user = models.User.query.filter( 83 models.User.username == username).first()
84
85 86 @app.errorhandler(404) 87 -def page_not_found(message):
88 return flask.render_template("404.html", message=message), 404
89
90 91 @app.errorhandler(403) 92 -def access_restricted(message):
93 return flask.render_template("403.html", message=message), 403
94
95 96 -def generic_error(message, code=500, title=None):
97 """ 98 :type message: str 99 :type err: CoprHttpException 100 """ 101 return flask.render_template("_error.html", 102 message=message, 103 error_code=code, 104 error_title=title), code
105 106 107 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 108 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 109 110 app.errorhandler(500)(server_error_handler) 111 app.errorhandler(400)(bad_request_handler) 112 113 misc = flask.Blueprint("misc", __name__) 114 115 116 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
117 -def krb5_login(name):
118 """ 119 Handle the Kerberos authentication. 120 121 Note that if we are able to get here, either the user is authenticated 122 correctly, or apache is mis-configured and it does not perform KRB 123 authentication at all. Note also, even if that can be considered ugly, we 124 are reusing oid's get_next_url feature with kerberos login. 125 """ 126 127 # Already logged in? 128 if flask.g.user is not None: 129 return flask.redirect(oid.get_next_url()) 130 131 krb_config = app.config['KRB5_LOGIN'] 132 133 found = None 134 for key in krb_config.keys(): 135 if krb_config[key]['URI'] == name: 136 found = key 137 break 138 139 if not found: 140 # no KRB5_LOGIN.<name> configured in copr.conf 141 return flask.render_template("404.html"), 404 142 143 if 'REMOTE_USER' not in flask.request.environ: 144 nocred = "Kerberos authentication failed (no credentials provided)" 145 return flask.render_template("403.html", message=nocred), 403 146 147 krb_username = flask.request.environ['REMOTE_USER'] 148 app.logger.debug("krb5 login attempt: " + krb_username) 149 username = krb_straighten_username(krb_username) 150 if not username: 151 message = "invalid krb5 username: " + krb_username 152 return flask.render_template("403.html", message=message), 403 153 154 krb_login = ( 155 models.Krb5Login.query 156 .filter(models.Krb5Login.config_name == key) 157 .filter(models.Krb5Login.primary == username) 158 .first() 159 ) 160 if krb_login: 161 flask.g.user = krb_login.user 162 flask.session['krb5_login'] = krb_login.user.name 163 flask.flash(u"Welcome, {0}".format(flask.g.user.name)) 164 return flask.redirect(oid.get_next_url()) 165 166 # We need to create row in 'krb5_login' table 167 user = models.User.query.filter(models.User.username == username).first() 168 if not user: 169 # Even the item in 'user' table does not exist, create _now_ 170 email = username + "@" + krb_config[key]['email_domain'] 171 user = create_user_wrapper(username, email) 172 db.session.add(user) 173 174 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 175 db.session.add(krb_login) 176 db.session.commit() 177 178 flask.flash(u"Welcome, {0}".format(user.name)) 179 flask.g.user = user 180 flask.session['krb5_login'] = user.name 181 return flask.redirect(oid.get_next_url())
182 183 184 @misc.route("/login/", methods=["GET"])
185 @oid.loginhandler 186 -def login():
187 if flask.g.user is not None: 188 return flask.redirect(oid.get_next_url()) 189 else: 190 # a bit of magic 191 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 192 return oid.try_login("https://id.fedoraproject.org/", 193 ask_for=["email", "timezone"], 194 extensions=[team_req])
195
196 197 @oid.after_login 198 -def create_or_login(resp):
199 flask.session["openid"] = resp.identity_url 200 fasusername = resp.identity_url.replace( 201 ".id.fedoraproject.org/", "").replace("http://", "") 202 203 # kidding me.. or not 204 if fasusername and ( 205 ( 206 app.config["USE_ALLOWED_USERS"] and 207 fasusername in app.config["ALLOWED_USERS"] 208 ) or not app.config["USE_ALLOWED_USERS"]): 209 210 username = fed_raw_name(resp.identity_url) 211 user = models.User.query.filter( 212 models.User.username == username).first() 213 if not user: # create if not created already 214 user = create_user_wrapper(username, resp.email, resp.timezone) 215 else: 216 user.mail = resp.email 217 user.timezone = resp.timezone 218 if "lp" in resp.extensions: 219 team_resp = resp.extensions['lp'] # name space for the teams extension 220 user.openid_groups = {"fas_groups": team_resp.teams} 221 222 db.session.add(user) 223 db.session.commit() 224 flask.flash(u"Welcome, {0}".format(user.name)) 225 flask.g.user = user 226 227 if flask.request.url_root == oid.get_next_url(): 228 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 229 username=user.name)) 230 return flask.redirect(oid.get_next_url()) 231 else: 232 flask.flash("User '{0}' is not allowed".format(fasusername)) 233 return flask.redirect(oid.get_next_url())
234
235 236 @misc.route("/logout/") 237 -def logout():
238 flask.session.pop("openid", None) 239 flask.session.pop("krb5_login", None) 240 flask.flash(u"You were signed out") 241 return flask.redirect(oid.get_next_url())
242
243 244 -def api_login_required(f):
245 @functools.wraps(f) 246 def decorated_function(*args, **kwargs): 247 token = None 248 apt_login = None 249 if "Authorization" in flask.request.headers: 250 base64string = flask.request.headers["Authorization"] 251 base64string = base64string.split()[1].strip() 252 userstring = base64.b64decode(base64string) 253 (apt_login, token) = userstring.decode("utf-8").split(":") 254 token_auth = False 255 if token and apt_login: 256 user = UsersLogic.get_by_api_login(apt_login).first() 257 if (user and user.api_token == token and 258 user.api_token_expiration >= datetime.date.today()): 259 260 token_auth = True 261 flask.g.user = user 262 if not token_auth: 263 output = { 264 "output": "notok", 265 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(app.config["FRONTEND_BASE_URL"]), 266 } 267 jsonout = flask.jsonify(output) 268 jsonout.status_code = 500 269 return jsonout 270 return f(*args, **kwargs)
271 return decorated_function 272
273 274 -def login_required(role=helpers.RoleEnum("user")):
275 def view_wrapper(f): 276 @functools.wraps(f) 277 def decorated_function(*args, **kwargs): 278 if flask.g.user is None: 279 return flask.redirect(flask.url_for("misc.login", 280 next=flask.request.url)) 281 282 if role == helpers.RoleEnum("admin") and not flask.g.user.admin: 283 flask.flash("You are not allowed to access admin section.") 284 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 285 286 return f(*args, **kwargs)
287 return decorated_function 288 # hack: if login_required is used without params, the "role" parameter 289 # is in fact the decorated function, so we need to return 290 # the wrapped function, not the wrapper 291 # proper solution would be to use login_required() with parentheses 292 # everywhere, even if they"re empty - TODO 293 if callable(role): 294 return view_wrapper(role) 295 else: 296 return view_wrapper 297
298 299 # backend authentication 300 -def backend_authenticated(f):
301 @functools.wraps(f) 302 def decorated_function(*args, **kwargs): 303 auth = flask.request.authorization 304 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 305 return "You have to provide the correct password\n", 401 306 307 return f(*args, **kwargs)
308 return decorated_function 309
310 311 -def intranet_required(f):
312 @functools.wraps(f) 313 def decorated_function(*args, **kwargs): 314 ip_addr = IPAddress(flask.request.remote_addr) 315 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 316 accept_ranges.add("127.0.0.1") # always accept from localhost 317 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 318 return ("Stats can be update only from intranet hosts, " 319 "not {}, check config\n".format(flask.request.remote_addr)), 403 320 321 return f(*args, **kwargs)
322 return decorated_function 323
324 325 -def req_with_copr(f):
326 @wraps(f) 327 def wrapper(**kwargs): 328 coprname = kwargs.pop("coprname") 329 if "group_name" in kwargs: 330 group_name = kwargs.pop("group_name") 331 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 332 else: 333 username = kwargs.pop("username") 334 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 335 return f(copr, **kwargs)
336 return wrapper 337
338 339 @misc.route("/migration-report/") 340 @misc.route("/migration-report/<username>") 341 -def coprs_migration_report(username=None):
342 if not username and not flask.g.user: 343 return generic_error("You are not logged in") 344 elif not username: 345 username = flask.g.user.name 346 user = UsersLogic.get(username).first() 347 348 coprs = CoprsLogic.filter_without_group_projects(CoprsLogic.get_multiple_owned_by_username(username)).all() 349 for group in UsersLogic.get_groups_by_fas_names_list(user.user_teams).all(): 350 coprs.extend(CoprsLogic.get_multiple_by_group_id(group.id).all()) 351 352 return render_migration_report(coprs, user=user)
353
354 355 @misc.route("/migration-report/g/<group_name>") 356 -def group_coprs_migration_report(group_name=None):
357 group = ComplexLogic.get_group_by_name_safe(group_name) 358 coprs = CoprsLogic.get_multiple_by_group_id(group.id) 359 return render_migration_report(coprs, group=group)
360
361 362 -def render_migration_report(coprs, user=None, group=None):
363 return flask.render_template("migration-report.html", 364 user=user, 365 group=group, 366 coprs=coprs)
367