1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import time
25 import tempfile
26
27 import gobject
28 import gst
29
30 from twisted.internet import reactor
31
32 from flumotion.component import feedcomponent
33 from flumotion.common import log, gstreamer, pygobject, messages, errors
34 from flumotion.common import documentation, format
35 from flumotion.common import eventcalendar
36 from flumotion.common.i18n import N_, gettexter
37 from flumotion.common.mimetypes import mimeTypeToExtention
38 from flumotion.common.pygobject import gsignal
39
40 from flumotion.component.component import moods
41
42 __all__ = ['Disker']
43 __version__ = "$Rev: 8162 $"
44 T_ = gettexter()
45
46 """
47 Disker has a property 'ical-schedule'. This allows an ical file to be
48 specified in the config and have recordings scheduled based on events.
49 This file will be monitored for changes and events reloaded if this
50 happens.
51
52 The filename of a recording started from an ical file will be produced
53 via passing the ical event summary through strftime, so that an archive
54 can encode the date and time that it was begun.
55
56 The time that will be given to strftime will be given in the timezone of
57 the ical event. In practice this will either be UTC or the local time of
58 the machine running the disker, as the ical scheduler does not
59 understand arbitrary timezones.
60 """
61
62
64
65
66
69
70
71
72
75
85
86
87
90
91
92 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
93 componentMediumClass = DiskerMedium
94 checkOffset = True
95 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false'
96 file = None
97 directory = None
98 location = None
99 caps = None
100
101 _startFilenameTemplate = None
102 _startTimeTuple = None
103 _rotateTimeDelayedCall = None
104 _symlinkToLastRecording = None
105 _symlinkToCurrentRecording = None
106
107
108
118
119
120
122 directory = properties['directory']
123
124 self.directory = directory
125
126 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')])
127
128 rotateType = properties.get('rotate-type', 'none')
129
130
131 if not rotateType in ['none', 'size', 'time']:
132 m = messages.Error(T_(N_(
133 "The configuration property 'rotate-type' should be set to "
134 "'size', time', or 'none', not '%s'. "
135 "Please fix the configuration."),
136 rotateType), mid='rotate-type')
137 self.addMessage(m)
138 raise errors.ComponentSetupHandledError()
139
140
141 if rotateType in ['size', 'time']:
142 if rotateType not in properties.keys():
143 m = messages.Error(T_(N_(
144 "The configuration property '%s' should be set. "
145 "Please fix the configuration."),
146 rotateType), mid='rotate-type')
147 self.addMessage(m)
148 raise errors.ComponentSetupHandledError()
149
150
151 if rotateType == 'size':
152 self.setSizeRotate(properties['size'])
153 elif rotateType == 'time':
154 self.setTimeRotate(properties['time'])
155
156
157 return self.pipe_template
158
183
184 if not eventcalendar.HAS_ICALENDAR:
185 missingModule('icalendar')
186 if not eventcalendar.HAS_DATEUTIL:
187 missingModule('dateutil')
188
189 raise errors.ComponentSetupHandledError()
190
191 sink = self.get_element('fdsink')
192 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb)
193
194 sink.connect('client-removed', self._client_removed_cb)
195
196
197 react_to_marks = properties.get('react-to-stream-markers', False)
198 if react_to_marks:
199 pfx = properties.get('stream-marker-filename-prefix', '%03d.')
200 self._markerPrefix = pfx
201 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
202
203
204
213
215 """
216 @param size: size of file (in bytes)
217 """
218 reactor.callLater(5, self._rotateSizeCallLater, size)
219
226
236
238 if self.caps:
239 return self.caps.get_structure(0).get_name()
240
241
242
244 mime = self.getMime()
245 if mime == 'multipart/x-mixed-replace':
246 mime += ";boundary=ThisRandomString"
247 return mime
248
280
282 """
283 @param filenameTemplate: strftime format string to decide filename
284 @param timeTuple: a valid time tuple to pass to strftime,
285 defaulting to time.localtime().
286 """
287 mime = self.getMime()
288 ext = mimeTypeToExtention(mime)
289
290 self.stopRecording()
291
292 sink = self.get_element('fdsink')
293 if sink.get_state() == gst.STATE_NULL:
294 sink.set_state(gst.STATE_READY)
295
296 filename = ""
297 if not filenameTemplate:
298 filenameTemplate = self._defaultFilenameTemplate
299 filename = "%s.%s" % (format.strftime(filenameTemplate,
300 timeTuple or time.localtime()), ext)
301 self.location = os.path.join(self.directory, filename)
302
303
304
305 location = self.location
306 i = 1
307 while os.path.exists(location):
308 mtimeTuple = time.gmtime(os.stat(location).st_mtime)
309 if mtimeTuple <= timeTuple:
310 self.info(
311 "Existing recording %s from previous event, overwriting",
312 location)
313 break
314
315 self.info(
316 "Existing recording %s from current event, changing name",
317 location)
318 location = self.location + '.' + str(i)
319 i += 1
320 self.location = location
321
322 self.info("Changing filename to %s", self.location)
323 try:
324 self.file = open(self.location, 'wb')
325 except IOError, e:
326 self.warning("Failed to open output file %s: %s",
327 self.location, log.getExceptionMessage(e))
328 m = messages.Error(T_(N_(
329 "Failed to open output file '%s' for writing. "
330 "Check permissions on the file."), self.location))
331 self.addMessage(m)
332 return
333 self._recordingStarted(self.file, self.location)
334 sink.emit('add', self.file.fileno())
335 self.uiState.set('filename', self.location)
336 self.uiState.set('recording', True)
337
338 if self._symlinkToCurrentRecording:
339 self._updateSymlink(self.location,
340 self._symlinkToCurrentRecording)
341
343 if not dest.startswith('/'):
344 dest = os.path.join(self.directory, dest)
345
346
347
348 self.debug("updating symbolic link %s to point to %s", dest, src)
349 try:
350 try:
351 os.symlink(src, dest)
352 except OSError, e:
353 if e.errno == errno.EEXIST and os.path.islink(dest):
354 os.unlink(dest)
355 os.symlink(src, dest)
356 else:
357 raise
358 except Exception, e:
359 self.info("Failed to update link %s: %s", dest,
360 log.getExceptionMessage(e))
361 m = messages.Warning(T_(N_("Failed to update symbolic link "
362 "'%s'. Check your permissions."), dest),
363 debug=log.getExceptionMessage(e))
364 self.addMessage(m)
365
381
401
402
403
405
406
407 if client_status == 4:
408
409
410 reactor.callFromThread(self._client_error_cb)
411
422
428
433
435
436
437
438 current = self.uiState.get('next-points')[:]
439 points = self.icalScheduler.getPoints()
440 new = []
441
442
443
444
445 def _utcAndStripTZ(dt):
446 from flumotion.common import eventcalendar
447 return dt.astimezone(eventcalendar.UTC).replace(tzinfo=None)
448
449 for p in points:
450 dtUTC = _utcAndStripTZ(p.dt)
451 dtStart = p.eventInstance.start.replace(tzinfo=None)
452 new.append((dtUTC, p.which,
453 format.strftime(p.eventInstance.event.content,
454 dtStart.timetuple())))
455
456 for t in current:
457 if t not in new:
458 self.debug('removing tuple %r from next-points', t)
459 self.uiState.remove('next-points', t)
460
461 for t in new:
462 if t not in current:
463 self.debug('appending tuple %r to next-points', t)
464 self.uiState.append('next-points', t)
465
467 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
468
469 if socket not in self.plugs:
470 return
471 for plug in self.plugs[socket]:
472 self.debug('invoking recordingStarted on '
473 'plug %r on socket %s', plug, socket)
474 plug.recordingStarted(file, location)
475
477 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
478
479 if socket not in self.plugs:
480 return
481 for plug in self.plugs[socket]:
482 self.debug('invoking recordingStopped on '
483 'plug %r on socket %s', plug, socket)
484 plug.recordingStopped(file, location)
485
486
487
489 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM:
490 evt_struct = event.get_structure()
491 if evt_struct.get_name() == 'FluStreamMark':
492 if evt_struct['action'] == 'start':
493 self._onMarkerStart(evt_struct['prog_id'])
494 elif evt_struct['action'] == 'stop':
495 self._onMarkerStop()
496 return True
497
500
502 tmpl = self._defaultFilenameTemplate
503 if self._markerPrefix:
504 try:
505 tmpl = '%s%s' % (self._markerPrefix % data,
506 self._defaultFilenameTemplate)
507 except TypeError, err:
508 m = messages.Warning(T_(N_('Failed expanding filename prefix: '
509 '%r <-- %r.'),
510 self._markerPrefix, data),
511 mid='expand-marker-prefix')
512 self.addMessage(m)
513 self.warning('Failed expanding filename prefix: '
514 '%r <-- %r; %r' %
515 (self._markerPrefix, data, err))
516 self.changeFilename(tmpl)
517