Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import os 
  2  import base64 
  3  import datetime 
  4  import functools 
  5  from functools import wraps, partial 
  6   
  7  from netaddr import IPAddress, IPNetwork 
  8  import re 
  9  import flask 
 10  from flask import send_file 
 11   
 12  from openid_teams.teams import TeamsRequest 
 13   
 14  from coprs import app 
 15  from coprs import db 
 16  from coprs import helpers 
 17  from coprs import models 
 18  from coprs import oid 
 19  from coprs.logic.complex_logic import ComplexLogic 
 20  from coprs.logic.users_logic import UsersLogic 
 21  from coprs.logic.coprs_logic import CoprsLogic 
22 23 24 -def fed_openidize_name(name):
25 """ 26 Create proper Fedora OpenID name from short name. 27 28 >>> fedoraoid == fed_openidize_name(user.name) 29 True 30 """ 31 32 return "http://{0}.id.fedoraproject.org/".format(name)
33
34 35 -def create_user_wrapper(username, email, timezone=None):
36 expiration_date_token = datetime.date.today() + \ 37 datetime.timedelta( 38 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 39 40 copr64 = base64.b64encode(b"copr") + b"##" 41 user = models.User(username=username, mail=email, 42 timezone=timezone, 43 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 44 app.config["API_TOKEN_LENGTH"] - len(copr64)), 45 api_token=helpers.generate_api_token( 46 app.config["API_TOKEN_LENGTH"]), 47 api_token_expiration=expiration_date_token) 48 return user
49
50 51 -def fed_raw_name(oidname):
52 return oidname.replace(".id.fedoraproject.org/", "") \ 53 .replace("http://", "")
54
55 56 -def krb_straighten_username(krb_remote_user):
57 # Input should look like 'USERNAME@REALM.TLD', strip realm. 58 username = re.sub(r'@.*', '', krb_remote_user) 59 60 # But USERNAME part can consist of USER/DOMAIN.TLD. 61 # TODO: Do we need more clever thing here? 62 username = re.sub('/', '_', username) 63 64 # Based on restrictions for project name: "letters, digits, underscores, 65 # dashes and dots", it is worth limitting the username here, too. 66 # TODO: Store this pattern on one place. 67 return username if re.match(r"^[\w.-]+$", username) else None
68
69 70 @app.before_request 71 -def set_empty_user():
72 flask.g.user = None
73
74 75 @app.before_request 76 -def lookup_current_user():
77 flask.g.user = username = None 78 if "openid" in flask.session: 79 username = fed_raw_name(flask.session["openid"]) 80 elif "krb5_login" in flask.session: 81 username = flask.session["krb5_login"] 82 83 if username: 84 flask.g.user = models.User.query.filter( 85 models.User.username == username).first()
86
87 88 @app.errorhandler(404) 89 -def page_not_found(message):
90 return flask.render_template("404.html", message=message), 404
91
92 93 @app.errorhandler(403) 94 -def access_restricted(message):
95 return flask.render_template("403.html", message=message), 403
96
97 98 -def generic_error(message, code=500, title=None):
99 """ 100 :type message: str 101 :type err: CoprHttpException 102 """ 103 return flask.render_template("_error.html", 104 message=message, 105 error_code=code, 106 error_title=title), code
107 108 109 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 110 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 111 112 app.errorhandler(500)(server_error_handler) 113 app.errorhandler(400)(bad_request_handler) 114 115 misc = flask.Blueprint("misc", __name__) 116 117 118 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
119 -def krb5_login(name):
120 """ 121 Handle the Kerberos authentication. 122 123 Note that if we are able to get here, either the user is authenticated 124 correctly, or apache is mis-configured and it does not perform KRB 125 authentication at all. Note also, even if that can be considered ugly, we 126 are reusing oid's get_next_url feature with kerberos login. 127 """ 128 129 # Already logged in? 130 if flask.g.user is not None: 131 return flask.redirect(oid.get_next_url()) 132 133 krb_config = app.config['KRB5_LOGIN'] 134 135 found = None 136 for key in krb_config.keys(): 137 if krb_config[key]['URI'] == name: 138 found = key 139 break 140 141 if not found: 142 # no KRB5_LOGIN.<name> configured in copr.conf 143 return flask.render_template("404.html"), 404 144 145 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ: 146 # For local testing (without krb5 keytab and other configuration) 147 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER'] 148 149 if 'REMOTE_USER' not in flask.request.environ: 150 nocred = "Kerberos authentication failed (no credentials provided)" 151 return flask.render_template("403.html", message=nocred), 403 152 153 krb_username = flask.request.environ['REMOTE_USER'] 154 app.logger.debug("krb5 login attempt: " + krb_username) 155 username = krb_straighten_username(krb_username) 156 if not username: 157 message = "invalid krb5 username: " + krb_username 158 return flask.render_template("403.html", message=message), 403 159 160 krb_login = ( 161 models.Krb5Login.query 162 .filter(models.Krb5Login.config_name == key) 163 .filter(models.Krb5Login.primary == username) 164 .first() 165 ) 166 if krb_login: 167 flask.g.user = krb_login.user 168 flask.session['krb5_login'] = krb_login.user.name 169 flask.flash(u"Welcome, {0}".format(flask.g.user.name), "success") 170 return flask.redirect(oid.get_next_url()) 171 172 # We need to create row in 'krb5_login' table 173 user = models.User.query.filter(models.User.username == username).first() 174 if not user: 175 # Even the item in 'user' table does not exist, create _now_ 176 email = username + "@" + krb_config[key]['email_domain'] 177 user = create_user_wrapper(username, email) 178 db.session.add(user) 179 180 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 181 db.session.add(krb_login) 182 db.session.commit() 183 184 flask.flash(u"Welcome, {0}".format(user.name), "success") 185 flask.g.user = user 186 flask.session['krb5_login'] = user.name 187 return flask.redirect(oid.get_next_url())
188
189 190 @misc.route("/login/", methods=["GET"]) 191 @oid.loginhandler 192 -def login():
193 if not app.config['FAS_LOGIN']: 194 if app.config['KRB5_LOGIN']: 195 return krb5_login_redirect(next=oid.get_next_url()) 196 flask.flash("No auth method available", "error") 197 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 198 199 if flask.g.user is not None: 200 return flask.redirect(oid.get_next_url()) 201 else: 202 # a bit of magic 203 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 204 return oid.try_login("https://id.fedoraproject.org/", 205 ask_for=["email", "timezone"], 206 extensions=[team_req])
207
208 209 @oid.after_login 210 -def create_or_login(resp):
211 flask.session["openid"] = resp.identity_url 212 fasusername = resp.identity_url.replace( 213 ".id.fedoraproject.org/", "").replace("http://", "") 214 215 # kidding me.. or not 216 if fasusername and ( 217 ( 218 app.config["USE_ALLOWED_USERS"] and 219 fasusername in app.config["ALLOWED_USERS"] 220 ) or not app.config["USE_ALLOWED_USERS"]): 221 222 username = fed_raw_name(resp.identity_url) 223 user = models.User.query.filter( 224 models.User.username == username).first() 225 if not user: # create if not created already 226 user = create_user_wrapper(username, resp.email, resp.timezone) 227 else: 228 user.mail = resp.email 229 user.timezone = resp.timezone 230 if "lp" in resp.extensions: 231 team_resp = resp.extensions['lp'] # name space for the teams extension 232 user.openid_groups = {"fas_groups": team_resp.teams} 233 234 db.session.add(user) 235 db.session.commit() 236 flask.flash(u"Welcome, {0}".format(user.name), "success") 237 flask.g.user = user 238 239 if flask.request.url_root == oid.get_next_url(): 240 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 241 username=user.name)) 242 return flask.redirect(oid.get_next_url()) 243 else: 244 flask.flash("User '{0}' is not allowed".format(fasusername)) 245 return flask.redirect(oid.get_next_url())
246
247 248 @misc.route("/logout/") 249 -def logout():
250 flask.session.pop("openid", None) 251 flask.session.pop("krb5_login", None) 252 flask.flash(u"You were signed out") 253 return flask.redirect(oid.get_next_url())
254
255 256 -def api_login_required(f):
257 @functools.wraps(f) 258 def decorated_function(*args, **kwargs): 259 token = None 260 apt_login = None 261 if "Authorization" in flask.request.headers: 262 base64string = flask.request.headers["Authorization"] 263 base64string = base64string.split()[1].strip() 264 userstring = base64.b64decode(base64string) 265 (apt_login, token) = userstring.decode("utf-8").split(":") 266 token_auth = False 267 if token and apt_login: 268 user = UsersLogic.get_by_api_login(apt_login).first() 269 if (user and user.api_token == token and 270 user.api_token_expiration >= datetime.date.today()): 271 272 if user.proxy and "username" in flask.request.form: 273 user = UsersLogic.get(flask.request.form["username"]).first() 274 275 token_auth = True 276 flask.g.user = user 277 if not token_auth: 278 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"] 279 url = helpers.fix_protocol_for_frontend(url) 280 281 output = { 282 "output": "notok", 283 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url), 284 } 285 jsonout = flask.jsonify(output) 286 jsonout.status_code = 401 287 return jsonout 288 return f(*args, **kwargs)
289 return decorated_function 290
291 292 -def krb5_login_redirect(next=None):
293 krbc = app.config['KRB5_LOGIN'] 294 for key in krbc: 295 # Pick the first one for now. 296 return flask.redirect(flask.url_for("misc.krb5_login", 297 name=krbc[key]['URI'], 298 next=next)) 299 flask.flash("Unable to pick krb5 login page", "error") 300 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
301
302 303 -def login_required(role=helpers.RoleEnum("user")):
304 def view_wrapper(f): 305 @functools.wraps(f) 306 def decorated_function(*args, **kwargs): 307 if flask.g.user is None: 308 return flask.redirect(flask.url_for("misc.login", 309 next=flask.request.url)) 310 311 if role == helpers.RoleEnum("admin") and not flask.g.user.admin: 312 flask.flash("You are not allowed to access admin section.") 313 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 314 315 return f(*args, **kwargs)
316 return decorated_function 317 # hack: if login_required is used without params, the "role" parameter 318 # is in fact the decorated function, so we need to return 319 # the wrapped function, not the wrapper 320 # proper solution would be to use login_required() with parentheses 321 # everywhere, even if they"re empty - TODO 322 if callable(role): 323 return view_wrapper(role) 324 else: 325 return view_wrapper 326
327 328 # backend authentication 329 -def backend_authenticated(f):
330 @functools.wraps(f) 331 def decorated_function(*args, **kwargs): 332 auth = flask.request.authorization 333 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 334 return "You have to provide the correct password\n", 401 335 336 return f(*args, **kwargs)
337 return decorated_function 338
339 340 -def intranet_required(f):
341 @functools.wraps(f) 342 def decorated_function(*args, **kwargs): 343 ip_addr = IPAddress(flask.request.remote_addr) 344 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 345 accept_ranges.add("127.0.0.1") # always accept from localhost 346 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 347 return ("Stats can be update only from intranet hosts, " 348 "not {}, check config\n".format(flask.request.remote_addr)), 403 349 350 return f(*args, **kwargs)
351 return decorated_function 352
353 354 -def req_with_copr(f):
355 @wraps(f) 356 def wrapper(**kwargs): 357 coprname = kwargs.pop("coprname") 358 if "group_name" in kwargs: 359 group_name = kwargs.pop("group_name") 360 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 361 else: 362 username = kwargs.pop("username") 363 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 364 return f(copr, **kwargs)
365 return wrapper 366
367 368 @misc.route("/migration-report/") 369 @misc.route("/migration-report/<username>") 370 -def coprs_migration_report(username=None):
371 if not username and not flask.g.user: 372 return generic_error("You are not logged in") 373 elif not username: 374 username = flask.g.user.name 375 user = UsersLogic.get(username).first() 376 377 coprs = CoprsLogic.filter_without_group_projects(CoprsLogic.get_multiple_owned_by_username(username)).all() 378 for group in UsersLogic.get_groups_by_fas_names_list(user.user_teams).all(): 379 coprs.extend(CoprsLogic.get_multiple_by_group_id(group.id).all()) 380 381 return render_migration_report(coprs, user=user)
382
383 384 @misc.route("/migration-report/g/<group_name>") 385 -def group_coprs_migration_report(group_name=None):
386 group = ComplexLogic.get_group_by_name_safe(group_name) 387 coprs = CoprsLogic.get_multiple_by_group_id(group.id) 388 return render_migration_report(coprs, group=group)
389
390 391 -def render_migration_report(coprs, user=None, group=None):
392 return flask.render_template("migration-report.html", 393 user=user, 394 group=group, 395 coprs=coprs)
396
397 398 -def send_build_icon(build):
399 if not build: 400 return send_file("static/status_images/unknown.png", 401 mimetype='image/png') 402 403 if build.state in ["importing", "pending", "starting", "running"]: 404 # The icon is about to change very soon, disable caches: 405 # https://help.github.com/articles/about-anonymized-image-urls/ 406 response = send_file("static/status_images/in_progress.png", 407 mimetype='image/png') 408 response.headers['Cache-Control'] = 'no-cache' 409 return response 410 411 if build.state in ["succeeded", "skipped"]: 412 return send_file("static/status_images/succeeded.png", 413 mimetype='image/png') 414 415 if build.state == "failed": 416 return send_file("static/status_images/failed.png", 417 mimetype='image/png') 418 419 return send_file("static/status_images/unknown.png", 420 mimetype='image/png')
421