1 import base64
2 import datetime
3 import functools
4 from functools import wraps, partial
5
6 from netaddr import IPAddress, IPNetwork
7 import re
8 import flask
9
10 from openid_teams.teams import TeamsRequest
11
12 from coprs import app
13 from coprs import db
14 from coprs import helpers
15 from coprs import models
16 from coprs import oid
17 from coprs.logic.complex_logic import ComplexLogic
18 from coprs.logic.users_logic import UsersLogic
19 from coprs.logic.coprs_logic import CoprsLogic
23 """
24 Create proper Fedora OpenID name from short name.
25
26 >>> fedoraoid == fed_openidize_name(user.name)
27 True
28 """
29
30 return "http://{0}.id.fedoraproject.org/".format(name)
31
47
50 return oidname.replace(".id.fedoraproject.org/", "") \
51 .replace("http://", "")
52
55 return re.sub(r'@.*', '', fullname)
56
61
74
75
76 @app.errorhandler(404)
77 -def page_not_found(message):
78 return flask.render_template("404.html", message=message), 404
79
84
87 """
88 :type message: str
89 :type err: CoprHttpException
90 """
91 return flask.render_template("_error.html",
92 message=message,
93 error_code=code,
94 error_title=title), code
95
96
97 server_error_handler = partial(generic_error, code=500, title="Internal Server Error")
98 bad_request_handler = partial(generic_error, code=400, title="Bad Request")
99
100 app.errorhandler(500)(server_error_handler)
101 app.errorhandler(400)(bad_request_handler)
102
103 misc = flask.Blueprint("misc", __name__)
104
105
106 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
108 """
109 Handle the Kerberos authentication.
110
111 Note that if we are able to get here, either the user is authenticated
112 correctly, or apache is mis-configured and it does not perform KRB
113 authentication at all. Note also, even if that can be considered ugly, we
114 are reusing oid's get_next_url feature with kerberos login.
115 """
116
117
118 if flask.g.user is not None:
119 return flask.redirect(oid.get_next_url())
120
121 krb_config = app.config['KRB5_LOGIN']
122
123 found = None
124 for key in krb_config.keys():
125 if krb_config[key]['URI'] == name:
126 found = key
127 break
128
129 if not found:
130
131 return flask.render_template("404.html"), 404
132
133 if 'REMOTE_USER' not in flask.request.environ:
134 nocred = "Kerberos authentication failed (no credentials provided)"
135 return flask.render_template("403.html", message=nocred), 403
136
137 krb_username = flask.request.environ['REMOTE_USER']
138 username = krb_strip_realm(krb_username)
139
140 krb_login = (
141 models.Krb5Login.query
142 .filter(models.Krb5Login.config_name == key)
143 .filter(models.Krb5Login.primary == username)
144 .first()
145 )
146 if krb_login:
147 flask.g.user = krb_login.user
148 flask.session['krb5_login'] = krb_login.user.name
149 flask.flash(u"Welcome, {0}".format(flask.g.user.name))
150 return flask.redirect(oid.get_next_url())
151
152
153 user = models.User.query.filter(models.User.username == username).first()
154 if not user:
155
156 email = username + "@" + krb_config[key]['email_domain']
157 user = create_user_wrapper(username, email)
158 db.session.add(user)
159
160 krb_login = models.Krb5Login(user=user, primary=username, config_name=key)
161 db.session.add(krb_login)
162 db.session.commit()
163
164 flask.flash(u"Welcome, {0}".format(user.name))
165 flask.g.user = user
166 flask.session['krb5_login'] = user.name
167 return flask.redirect(oid.get_next_url())
168
169
170 @misc.route("/login/", methods=["GET"])
171 @oid.loginhandler
172 -def login():
173 if flask.g.user is not None:
174 return flask.redirect(oid.get_next_url())
175 else:
176
177 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"])
178 return oid.try_login("https://id.fedoraproject.org/",
179 ask_for=["email", "timezone"],
180 extensions=[team_req])
181
185 flask.session["openid"] = resp.identity_url
186 fasusername = resp.identity_url.replace(
187 ".id.fedoraproject.org/", "").replace("http://", "")
188
189
190 if fasusername and (
191 (
192 app.config["USE_ALLOWED_USERS"] and
193 fasusername in app.config["ALLOWED_USERS"]
194 ) or not app.config["USE_ALLOWED_USERS"]):
195
196 username = fed_raw_name(resp.identity_url)
197 user = models.User.query.filter(
198 models.User.username == username).first()
199 if not user:
200 user = create_user_wrapper(username, resp.email, resp.timezone)
201 else:
202 user.mail = resp.email
203 user.timezone = resp.timezone
204 if "lp" in resp.extensions:
205 team_resp = resp.extensions['lp']
206 user.openid_groups = {"fas_groups": team_resp.teams}
207
208 db.session.add(user)
209 db.session.commit()
210 flask.flash(u"Welcome, {0}".format(user.name))
211 flask.g.user = user
212
213 if flask.request.url_root == oid.get_next_url():
214 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user",
215 username=user.name))
216 return flask.redirect(oid.get_next_url())
217 else:
218 flask.flash("User '{0}' is not allowed".format(fasusername))
219 return flask.redirect(oid.get_next_url())
220
221
222 @misc.route("/logout/")
223 -def logout():
224 flask.session.pop("openid", None)
225 flask.session.pop("krb5_login", None)
226 flask.flash(u"You were signed out")
227 return flask.redirect(oid.get_next_url())
228
231 @functools.wraps(f)
232 def decorated_function(*args, **kwargs):
233 token = None
234 apt_login = None
235 if "Authorization" in flask.request.headers:
236 base64string = flask.request.headers["Authorization"]
237 base64string = base64string.split()[1].strip()
238 userstring = base64.b64decode(base64string)
239 (apt_login, token) = userstring.decode("utf-8").split(":")
240 token_auth = False
241 if token and apt_login:
242 user = UsersLogic.get_by_api_login(apt_login).first()
243 if (user and user.api_token == token and
244 user.api_token_expiration >= datetime.date.today()):
245
246 token_auth = True
247 flask.g.user = user
248 if not token_auth:
249 output = {
250 "output": "notok",
251 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(app.config["PUBLIC_COPR_HOSTNAME"]),
252 }
253 jsonout = flask.jsonify(output)
254 jsonout.status_code = 500
255 return jsonout
256 return f(*args, **kwargs)
257 return decorated_function
258
261 def view_wrapper(f):
262 @functools.wraps(f)
263 def decorated_function(*args, **kwargs):
264 if flask.g.user is None:
265 return flask.redirect(flask.url_for("misc.login",
266 next=flask.request.url))
267
268 if role == helpers.RoleEnum("admin") and not flask.g.user.admin:
269 flask.flash("You are not allowed to access admin section.")
270 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
271
272 return f(*args, **kwargs)
273 return decorated_function
274
275
276
277
278
279 if callable(role):
280 return view_wrapper(role)
281 else:
282 return view_wrapper
283
287 @functools.wraps(f)
288 def decorated_function(*args, **kwargs):
289 auth = flask.request.authorization
290 if not auth or auth.password != app.config["BACKEND_PASSWORD"]:
291 return "You have to provide the correct password\n", 401
292
293 return f(*args, **kwargs)
294 return decorated_function
295
298 @functools.wraps(f)
299 def decorated_function(*args, **kwargs):
300 ip_addr = IPAddress(flask.request.remote_addr)
301 accept_ranges = set(app.config.get("INTRANET_IPS", []))
302 accept_ranges.add("127.0.0.1")
303 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges):
304 return ("Stats can be update only from intranet hosts, "
305 "not {}, check config\n".format(flask.request.remote_addr)), 403
306
307 return f(*args, **kwargs)
308 return decorated_function
309
322 return wrapper
323
324
325 @misc.route("/migration-report/")
326 @misc.route("/migration-report/<username>")
327 -def coprs_migration_report(username=None):
339
346
353