1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import os
19 import string
20 import time
21
22 from twisted.web import server, http
23 from twisted.web.resource import Resource
24 from twisted.internet import defer, reactor, error
25 from twisted.cred import credentials
26 from zope.interface import implements
27
28 from flumotion.common import log, messages, errors, netutils, interfaces
29 from flumotion.common.i18n import N_, gettexter
30 from flumotion.component import component
31 from flumotion.component.base import http as httpbase
32 from flumotion.component.component import moods
33 from flumotion.component.misc.httpserver import httpfile, \
34 localprovider, localpath
35 from flumotion.component.misc.httpserver import serverstats
36 from flumotion.component.misc.porter import porterclient
37 from flumotion.twisted import fdserver
38
39 __version__ = "$Rev$"
40 T_ = gettexter()
41
42 UPTIME_UPDATE_INTERVAL = 5
43
44 FILEPROVIDER_SOCKET = 'flumotion.component.misc.httpserver' \
45 '.fileprovider.FileProviderPlug'
46
47
49
51 server.Request.__init__(self, channel, queued)
52 now = time.time()
53 self.lastTimeWritten = now
54
55
56
57 self.fd = self.transport.fileno()
58
59 self._component = channel.factory.component
60 self._transfer = None
61 self._provider = None
62 self._startTime = now
63 self._completionTime = None
64 self._rangeFirstByte = None
65 self._rangeLastByte = None
66 self._resourceSize = None
67 self._bytesWritten = 0L
68
69
70 self.stats = serverstats.RequestStatistics(self._component.stats)
71
72 self._component.requestStarted(self)
73
75 self._rangeFirstByte = first
76 self._rangeLastByte = last
77 self._resourceSize = size
78
86
95
101
110
112 headers = self.getAllHeaders()
113 duration = (self._completionTime or time.time()) - self._startTime
114 requestFields = {'ip': self.getClientIP(),
115 'method': self.method,
116 'uri': self.uri,
117 'get-parameters': self.args,
118 'clientproto': self.clientproto,
119 'response': self.code,
120 'bytes-sent': self._bytesWritten,
121 'referer': headers.get('referer', None),
122 'user-agent': headers.get('user-agent', None),
123 'time-connected': duration,
124 'resource-size': self._resourceSize,
125 'range-first': self._rangeFirstByte,
126 'range-last': self._rangeLastByte}
127 if self._provider:
128
129 providerFields = self._provider.getLogFields()
130 providerFields.update(requestFields)
131 requestFields = providerFields
132 return requestFields
133
134
135 -class Site(server.Site):
142
143
145 """
146 I wrap a statistics ui state entry, to allow updates.
147 """
148
152
153 - def update(self, name, value):
156
157
159
165
167 """
168 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
169 """
170 return self.callRemote('authenticate', bouncerName, keycard)
171
172 - def keepAlive(self, bouncerName, issuerName, ttl):
173 """
174 @rtype: L{twisted.internet.defer.Deferred}
175 """
176 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
177
179 """
180 @rtype: L{twisted.internet.defer.Deferred}
181 """
182 return self.callRemote('removeKeycardId', bouncerName, keycardId)
183
186
189
192
195
198
201
205
206
208 implements(interfaces.IStreamingComponent)
209
210 componentMediumClass = HTTPFileMedium
211
212 REQUEST_TIMEOUT = 30
213
214
216 self.mountPoint = None
217 self.type = None
218 self.port = None
219 self.hostname = None
220 self.stats = None
221 self._rateControlPlug = None
222 self._fileProviderPlug = None
223 self._metadataProviderPlug = None
224 self._loggers = []
225 self._requestModifiers = []
226 self._logfilter = None
227 self.httpauth = None
228 self._startTime = time.time()
229 self._uptimeCallId = None
230 self._allowBrowsing = False
231
232 self._description = 'On-Demand Flumotion Stream'
233
234 self._singleFile = False
235 self._connected_clients = {}
236 self._total_bytes_written = 0
237
238 self._pbclient = None
239
240 self._twistedPort = None
241 self._timeoutRequestsCallLater = None
242
243 self._pendingDisconnects = {}
244 self._rootResource = None
245
246
247
248 self._mimeToResource = {
249 'video/x-flv': httpfile.FLVFile,
250 'video/mp4': httpfile.MP4File,
251 }
252
253 self.uiState.addKey('stream-url', None)
254 self.uiState.addKey('server-uptime', 0)
255 self.uiState.addKey('file-provider', None)
256 self.uiState.addKey('allow-browsing', False)
257 self.uiState.addDictKey('request-statistics')
258 self.uiState.addDictKey('provider-statistics')
259
261 props = self.config['properties']
262 self.fixRenamedProperties(props, [
263 ('issuer', 'issuer-class'),
264 ('porter_socket_path', 'porter-socket-path'),
265 ('porter_username', 'porter-username'),
266 ('porter_password', 'porter-password'),
267 ('mount_point', 'mount-point')])
268
269 path = props.get('path', None)
270 plugs = self.plugs.get(FILEPROVIDER_SOCKET, [])
271 if plugs:
272 if path:
273 self.warning("The component property 'path' should not be used"
274 " in conjunction with a file provider plug.")
275
276
277
278
279
280
281 if props.get('type', 'master') == 'slave':
282 for k in 'socket-path', 'username', 'password':
283 if not 'porter-' + k in props:
284 msg = 'slave mode, missing required property porter-%s' % k
285 return defer.fail(errors.ConfigError(msg))
286 if plugs or not path:
287 return
288 if os.path.isfile(path):
289 self._singleFile = True
290 elif os.path.isdir(path):
291 self._singleFile = False
292 else:
293 msg = "the file or directory specified in 'path': %s does " \
294 "not exist or is neither a file nor directory" % path
295 return defer.fail(errors.ConfigError(msg))
296
298 desc = props.get('description', None)
299 if desc:
300 self._description = desc
301
302
303 mountPoint = props.get('mount-point', '/')
304 if not mountPoint.startswith('/'):
305 mountPoint = '/' + mountPoint
306 self.mountPoint = mountPoint
307 self.hostname = props.get('hostname', None)
308 if not self.hostname:
309 self.hostname = netutils.guess_public_hostname()
310
311 self.type = props.get('type', 'master')
312 self.port = props.get('port', 8801)
313 self._allowBrowsing = props.get('allow-browsing', False)
314 if self.type == 'slave':
315
316 self._porterPath = props['porter-socket-path']
317 self._porterUsername = props['porter-username']
318 self._porterPassword = props['porter-password']
319 socket = 'flumotion.component.plugs.request.RequestLoggerPlug'
320 self._loggers = self.plugs.get(socket, [])
321 socket = \
322 'flumotion.component.plugs.requestmodifier.RequestModifierPlug'
323 self._requestModifiers = self.plugs.get(socket, [])
324
325 self.httpauth = httpbase.HTTPAuthentication(self)
326 if 'avatarId' in self.config:
327 self.httpauth.setRequesterId(self.config['avatarId'])
328 if 'bouncer' in props:
329 self.httpauth.setBouncerName(props['bouncer'])
330 if 'issuer-class' in props:
331 self.warning("The component property 'issuer-class' has been"
332 "deprecated.")
333 msg = messages.Warning(T_(N_(
334 "The component property 'issuer-class' has "
335 "been deprecated.")))
336 self.addMessage(msg)
337
338 if 'allow-default' in props:
339 self.httpauth.setAllowDefault(props['allow-default'])
340 if 'ip-filter' in props:
341 logFilter = http.LogFilter()
342 for f in props['ip-filter']:
343 logFilter.addIPFilter(f)
344 self._logfilter = logFilter
345 socket = \
346 'flumotion.component.misc.httpserver.ratecontrol.RateControllerPlug'
347 plugs = self.plugs.get(socket, [])
348 if plugs:
349
350 self._rateControlPlug = self.plugs[socket][-1]
351
352 plugs = self.plugs.get(FILEPROVIDER_SOCKET, [])
353 if plugs:
354
355 self._fileProviderPlug = plugs[-1]
356 else:
357
358
359 plugProps = {"properties": {"path": props.get('path', None)}}
360 self._fileProviderPlug = localprovider.FileProviderLocalPlug(
361 plugProps)
362
363 socket = ('flumotion.component.misc.httpserver'
364 '.metadataprovider.MetadataProviderPlug')
365 plugs = self.plugs.get(socket, [])
366 if plugs:
367 self._metadataProviderPlug = plugs[-1]
368
369
370 self.uiState.set('stream-url', self.getUrl())
371 self.uiState.set('allow-browsing', self._allowBrowsing)
372
374 self.have_properties(self.config['properties'])
375
376 root = self._rootResource
377 if root is None:
378 root = self._getDefaultRootResource()
379
380 if root is None:
381 raise errors.WrongStateError(
382 "a resource or path property must be set")
383
384 site = Site(root, self)
385 self._timeoutRequestsCallLater = reactor.callLater(
386 self.REQUEST_TIMEOUT, self._timeoutRequests)
387
388
389 self.stats = serverstats.ServerStatistics()
390 updater = StatisticsUpdater(self.uiState, "request-statistics")
391 self.stats.startUpdates(updater)
392 updater = StatisticsUpdater(self.uiState, "provider-statistics")
393 self._fileProviderPlug.startStatsUpdates(updater)
394 self._updateUptime()
395
396 d = defer.Deferred()
397 if self.type == 'slave':
398
399 if self._singleFile:
400 self._pbclient = porterclient.HTTPPorterClientFactory(
401 site, [self.mountPoint], d)
402 else:
403 self._pbclient = porterclient.HTTPPorterClientFactory(
404 site, [], d,
405 prefixes=[self.mountPoint])
406 creds = credentials.UsernamePassword(self._porterUsername,
407 self._porterPassword)
408 self._pbclient.startLogin(creds, self._pbclient.medium)
409 self.info("Logging to porter on socketPath %s", self._porterPath)
410
411 c = fdserver.FDConnector(self._porterPath, self._pbclient, 10,
412 checkPID=False, reactor=reactor)
413 c.connect()
414 else:
415
416 try:
417 self.debug('Going to listen on port %d' % self.port)
418 iface = ""
419
420
421 self._twistedPort = reactor.listenTCP(self.port,
422 site, interface=iface)
423 self.port = self._twistedPort.getHost().port
424 self.info('Listening on interface %r on port %d',
425 iface, self.port)
426 except error.CannotListenError:
427 t = 'Port %d is not available.' % self.port
428 self.warning(t)
429 m = messages.Error(T_(N_(
430 "Network error: TCP port %d is not available."),
431 self.port))
432 self.addMessage(m)
433 self.setMood(moods.sad)
434 return defer.fail(errors.ComponentSetupHandledError(t))
435
436 d.callback(None)
437
438
439 def setComponentHappy(result):
440 self.httpauth.scheduleKeepAlive()
441 self.setMood(moods.happy)
442 return result
443 d.addCallback(setComponentHappy)
444 return d
445
447 if self.stats:
448 self.stats.stopUpdates()
449 if self._fileProviderPlug:
450 self._fileProviderPlug.stopStatsUpdates()
451 if self.httpauth:
452 self.httpauth.stopKeepAlive()
453 if self._timeoutRequestsCallLater:
454 self._timeoutRequestsCallLater.cancel()
455 self._timeoutRequestsCallLater = None
456 if self._uptimeCallId:
457 self._uptimeCallId.cancel()
458 self._uptimeCallId = None
459 if self._twistedPort:
460 self._twistedPort.stopListening()
461
462 l = [self.remove_all_clients()]
463 if self.type == 'slave' and self._pbclient:
464 if self._singleFile:
465 l.append(self._pbclient.deregisterPath(self.mountPoint))
466 else:
467 l.append(self._pbclient.deregisterPrefix(self.mountPoint))
468 return defer.DeferredList(l)
469
471 """
472 Provide a new set of porter login information, for when we're in slave
473 mode and the porter changes.
474 If we're currently connected, this won't disconnect - it'll just change
475 the information so that next time we try and connect we'll use the
476 new ones
477 @param path: new path
478 @param username: new username
479 @param password: new password
480 """
481 if self.type != 'slave':
482 raise errors.WrongStateError(
483 "Can't specify porter details in master mode")
484
485 self._porterUsername = username
486 self._porterPassword = password
487
488 creds = credentials.UsernamePassword(self._porterUsername,
489 self._porterPassword)
490 self._pbclient.startLogin(creds, self.medium)
491
492 self._updatePath(path)
493
495
496 if path == self._porterPath:
497 return
498 self._porterPath = path
499
500
501 self._pbclient.stopTrying()
502
503 self._pbclient.resetDelay()
504 c = fdserver.FDConnector(self._porterPath, self._pbclient, 10,
505 checkPID=False, reactor=reactor)
506 c.connect()
507
529
531 node = self._fileProviderPlug.getRootPath()
532 if node is None:
533 return None
534
535 self.debug('Starting with mount point "%s"' % self.mountPoint)
536 factory = httpfile.MimedFileFactory(self.httpauth,
537 mimeToResource=self._mimeToResource,
538 rateController=self._rateControlPlug,
539 requestModifiers=self._requestModifiers,
540 metadataProvider=self._metadataProviderPlug)
541
542 root = factory.create(node)
543 if self.mountPoint != '/':
544 root = self._createRootResourceForPath(self.mountPoint, root)
545
546 return root
547
549 if path.endswith('/'):
550 path = path[:-1]
551
552 root = Resource()
553 children = string.split(path[1:], '/')
554 parent = root
555 for child in children[:-1]:
556 resource = Resource()
557 self.debug("Putting Resource at %s", child)
558 parent.putChild(child, resource)
559 parent = resource
560 self.debug("Putting resource %r at %r", fileResource, children[-1])
561 parent.putChild(children[-1], fileResource)
562 return root
563
565 """
566 Remove a client when requested.
567
568 Used by keycard expiry.
569 """
570 if fd in self._connected_clients:
571 request = self._connected_clients[fd]
572 self.debug("Removing client for fd %d", fd)
573 request.unregisterProducer()
574 request.channel.transport.loseConnection()
575 else:
576 self.debug("No client with fd %d found", fd)
577
591
593
594 fd = request.transport.fileno()
595 self._connected_clients[fd] = request
596 self.debug("[fd %5d] (ts %f) request %r started",
597 fd, time.time(), request)
598
600
601
602 self.debug('[fd %5d] (ts %f) finishing request %r',
603 request.transport.fileno(), time.time(), request)
604
605 self.httpauth.cleanupAuth(fd)
606 ip = request.getClientIP()
607 if not self._logfilter or not self._logfilter.isInRange(ip):
608 fields = request.getLogFields()
609 fields.update({'time': time.gmtime(),
610 'username': '-'})
611 l = []
612 for logger in self._loggers:
613 l.append(defer.maybeDeferred(
614 logger.event, 'http_session_completed', fields))
615 d = defer.DeferredList(l)
616 else:
617 d = defer.succeed(None)
618
619 del self._connected_clients[fd]
620
621 self._total_bytes_written += bytesWritten
622
623 def firePendingDisconnect(_):
624 self.debug("Logging completed")
625 if fd in self._pendingDisconnects:
626 pending = self._pendingDisconnects.pop(fd)
627 self.debug("Firing pending disconnect deferred")
628 pending.callback(None)
629
630
631 self.debug('[fd %5d] (ts %f) finished request %r',
632 fd, time.time(), request)
633
634 d.addCallback(firePendingDisconnect)
635
637 return self._description
638
640 port = self.port
641
642 if self.type == 'slave' and self._pbclient:
643 if not self._pbclient.remote_port:
644 return ""
645 port = self._pbclient.remote_port
646
647 if (not port) or (port == 80):
648 port_str = ""
649 else:
650 port_str = ":%d" % port
651
652 return "http://%s%s%s" % (self.hostname, port_str, self.mountPoint)
653
655 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
656 if socket in self.plugs:
657 plug = self.plugs[socket][-1]
658 return plug.getStreamData()
659 else:
660 return {'protocol': 'HTTP',
661 'description': self._description,
662 'url': self.getUrl()}
663
665 """
666 Return the number of connected clients
667 """
668 return len(self._connected_clients)
669
671 """
672 Current Bandwidth
673 """
674 bytesTransferred = self._total_bytes_written
675 for request in self._connected_clients.values():
676 if request._transfer:
677 bytesTransferred += request._transfer.bytesWritten
678 return bytesTransferred
679
681 """
682 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
683 current_clients, current_load) of our current bandwidth and
684 user values. The deltas and current_load are NOT currently
685 implemented here, we set them as zero.
686 """
687 return (0, 0, self.getBytesSent(), self.getClients(), 0)
688
690 """
691 Close the logfile, then reopen using the previous logfilename
692 """
693 for logger in self._loggers:
694 self.debug('rotating logger %r' % logger)
695 logger.rotate()
696
698 """Attaches a root resource to this component. The root resource is the
699 once which will be used when accessing the mount point.
700 This is normally called from a plugs start() method.
701 @param resource: root resource
702 @type resource: L{twisted.web.resource.Resource}
703 """
704 rootResource = self._createRootResourceForPath(
705 self.getMountPoint(), resource)
706
707 self._rootResource = rootResource
708
710 """Get the mount point of this component
711 @returns: the mount point
712 """
713
714 return self.config['properties'].get('mount-point')
715
721