1 import base64
2 import datetime
3 import functools
4 from functools import wraps, partial
5
6 from netaddr import IPAddress, IPNetwork
7 import re
8 import flask
9
10 from openid_teams.teams import TeamsRequest
11
12 from coprs import app
13 from coprs import db
14 from coprs import helpers
15 from coprs import models
16 from coprs import oid
17 from coprs.logic.complex_logic import ComplexLogic
18 from coprs.logic.users_logic import UsersLogic
19 from coprs.logic.coprs_logic import CoprsLogic
23 """
24 Create proper Fedora OpenID name from short name.
25
26 >>> fedoraoid == fed_openidize_name(user.name)
27 True
28 """
29
30 return "http://{0}.id.fedoraproject.org/".format(name)
31
47
50 return oidname.replace(".id.fedoraproject.org/", "") \
51 .replace("http://", "")
52
66
71
84
85
86 @app.errorhandler(404)
87 -def page_not_found(message):
88 return flask.render_template("404.html", message=message), 404
89
94
97 """
98 :type message: str
99 :type err: CoprHttpException
100 """
101 return flask.render_template("_error.html",
102 message=message,
103 error_code=code,
104 error_title=title), code
105
106
107 server_error_handler = partial(generic_error, code=500, title="Internal Server Error")
108 bad_request_handler = partial(generic_error, code=400, title="Bad Request")
109
110 app.errorhandler(500)(server_error_handler)
111 app.errorhandler(400)(bad_request_handler)
112
113 misc = flask.Blueprint("misc", __name__)
114
115
116 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
118 """
119 Handle the Kerberos authentication.
120
121 Note that if we are able to get here, either the user is authenticated
122 correctly, or apache is mis-configured and it does not perform KRB
123 authentication at all. Note also, even if that can be considered ugly, we
124 are reusing oid's get_next_url feature with kerberos login.
125 """
126
127
128 if flask.g.user is not None:
129 return flask.redirect(oid.get_next_url())
130
131 krb_config = app.config['KRB5_LOGIN']
132
133 found = None
134 for key in krb_config.keys():
135 if krb_config[key]['URI'] == name:
136 found = key
137 break
138
139 if not found:
140
141 return flask.render_template("404.html"), 404
142
143 if 'REMOTE_USER' not in flask.request.environ:
144 nocred = "Kerberos authentication failed (no credentials provided)"
145 return flask.render_template("403.html", message=nocred), 403
146
147 krb_username = flask.request.environ['REMOTE_USER']
148 app.logger.debug("krb5 login attempt: " + krb_username)
149 username = krb_straighten_username(krb_username)
150 if not username:
151 message = "invalid krb5 username: " + krb_username
152 return flask.render_template("403.html", message=message), 403
153
154 krb_login = (
155 models.Krb5Login.query
156 .filter(models.Krb5Login.config_name == key)
157 .filter(models.Krb5Login.primary == username)
158 .first()
159 )
160 if krb_login:
161 flask.g.user = krb_login.user
162 flask.session['krb5_login'] = krb_login.user.name
163 flask.flash(u"Welcome, {0}".format(flask.g.user.name))
164 return flask.redirect(oid.get_next_url())
165
166
167 user = models.User.query.filter(models.User.username == username).first()
168 if not user:
169
170 email = username + "@" + krb_config[key]['email_domain']
171 user = create_user_wrapper(username, email)
172 db.session.add(user)
173
174 krb_login = models.Krb5Login(user=user, primary=username, config_name=key)
175 db.session.add(krb_login)
176 db.session.commit()
177
178 flask.flash(u"Welcome, {0}".format(user.name))
179 flask.g.user = user
180 flask.session['krb5_login'] = user.name
181 return flask.redirect(oid.get_next_url())
182
183
184 @misc.route("/login/", methods=["GET"])
185 @oid.loginhandler
186 -def login():
187 if flask.g.user is not None:
188 return flask.redirect(oid.get_next_url())
189 else:
190
191 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"])
192 return oid.try_login("https://id.fedoraproject.org/",
193 ask_for=["email", "timezone"],
194 extensions=[team_req])
195
199 flask.session["openid"] = resp.identity_url
200 fasusername = resp.identity_url.replace(
201 ".id.fedoraproject.org/", "").replace("http://", "")
202
203
204 if fasusername and (
205 (
206 app.config["USE_ALLOWED_USERS"] and
207 fasusername in app.config["ALLOWED_USERS"]
208 ) or not app.config["USE_ALLOWED_USERS"]):
209
210 username = fed_raw_name(resp.identity_url)
211 user = models.User.query.filter(
212 models.User.username == username).first()
213 if not user:
214 user = create_user_wrapper(username, resp.email, resp.timezone)
215 else:
216 user.mail = resp.email
217 user.timezone = resp.timezone
218 if "lp" in resp.extensions:
219 team_resp = resp.extensions['lp']
220 user.openid_groups = {"fas_groups": team_resp.teams}
221
222 db.session.add(user)
223 db.session.commit()
224 flask.flash(u"Welcome, {0}".format(user.name))
225 flask.g.user = user
226
227 if flask.request.url_root == oid.get_next_url():
228 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user",
229 username=user.name))
230 return flask.redirect(oid.get_next_url())
231 else:
232 flask.flash("User '{0}' is not allowed".format(fasusername))
233 return flask.redirect(oid.get_next_url())
234
235
236 @misc.route("/logout/")
237 -def logout():
238 flask.session.pop("openid", None)
239 flask.session.pop("krb5_login", None)
240 flask.flash(u"You were signed out")
241 return flask.redirect(oid.get_next_url())
242
245 @functools.wraps(f)
246 def decorated_function(*args, **kwargs):
247 token = None
248 apt_login = None
249 if "Authorization" in flask.request.headers:
250 base64string = flask.request.headers["Authorization"]
251 base64string = base64string.split()[1].strip()
252 userstring = base64.b64decode(base64string)
253 (apt_login, token) = userstring.decode("utf-8").split(":")
254 token_auth = False
255 if token and apt_login:
256 user = UsersLogic.get_by_api_login(apt_login).first()
257 if (user and user.api_token == token and
258 user.api_token_expiration >= datetime.date.today()):
259
260 token_auth = True
261 flask.g.user = user
262 if not token_auth:
263 output = {
264 "output": "notok",
265 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(app.config["FRONTEND_BASE_URL"]),
266 }
267 jsonout = flask.jsonify(output)
268 jsonout.status_code = 500
269 return jsonout
270 return f(*args, **kwargs)
271 return decorated_function
272
275 def view_wrapper(f):
276 @functools.wraps(f)
277 def decorated_function(*args, **kwargs):
278 if flask.g.user is None:
279 return flask.redirect(flask.url_for("misc.login",
280 next=flask.request.url))
281
282 if role == helpers.RoleEnum("admin") and not flask.g.user.admin:
283 flask.flash("You are not allowed to access admin section.")
284 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
285
286 return f(*args, **kwargs)
287 return decorated_function
288
289
290
291
292
293 if callable(role):
294 return view_wrapper(role)
295 else:
296 return view_wrapper
297
301 @functools.wraps(f)
302 def decorated_function(*args, **kwargs):
303 auth = flask.request.authorization
304 if not auth or auth.password != app.config["BACKEND_PASSWORD"]:
305 return "You have to provide the correct password\n", 401
306
307 return f(*args, **kwargs)
308 return decorated_function
309
312 @functools.wraps(f)
313 def decorated_function(*args, **kwargs):
314 ip_addr = IPAddress(flask.request.remote_addr)
315 accept_ranges = set(app.config.get("INTRANET_IPS", []))
316 accept_ranges.add("127.0.0.1")
317 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges):
318 return ("Stats can be update only from intranet hosts, "
319 "not {}, check config\n".format(flask.request.remote_addr)), 403
320
321 return f(*args, **kwargs)
322 return decorated_function
323
336 return wrapper
337
338
339 @misc.route("/migration-report/")
340 @misc.route("/migration-report/<username>")
341 -def coprs_migration_report(username=None):
353
360
367