Package flumotion :: Package service :: Module service
[hide private]

Source Code for Module flumotion.service.service

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or 
 11  # (at your option) any later version. 
 12  # See "LICENSE.GPL" in the source distribution for more information. 
 13   
 14  # This program is also licensed under the Flumotion license. 
 15  # See "LICENSE.Flumotion" in the source distribution for more information. 
 16   
 17  """ 
 18  Servicer object used in service scripts 
 19  """ 
 20   
 21  import os 
 22  import glob 
 23  import time 
 24   
 25  from flumotion.configure import configure 
 26  from flumotion.common import errors, log 
 27  from flumotion.common.python import makedirs 
 28  from flumotion.common.process import checkPidRunning, deletePidFile, getPid, \ 
 29       killPid, termPid, waitPidFile 
 30   
 31  __version__ = "$Rev: 7917 $" 
 32   
 33   
34 -class Servicer(log.Loggable):
35 """ 36 I manage running managers and workers on behalf of a service script. 37 """ 38 39 logCategory = 'servicer' 40
41 - def __init__(self, configDir=None, logDir=None, runDir=None):
42 """ 43 @type configDir: string 44 @param configDir: overridden path to the configuration directory. 45 @type logDir: string 46 @param logDir: overridden path to the log directory. 47 @type runDir: string 48 @param runDir: overridden path to the run directory. 49 """ 50 self.managersDir = os.path.join(configure.configdir, 'managers') 51 self.workersDir = os.path.join(configure.configdir, 'workers') 52 self._overrideDir = { 53 'logdir': logDir, 54 'rundir': runDir, 55 }
56
57 - def _parseManagersWorkers(self, command, args):
58 # parse the given args and return two sorted lists; 59 # one of manager names to act on and one of worker names 60 managers = [] 61 workers = [] 62 63 if not args: 64 managers = self.getManagers().keys() 65 managers.sort() 66 workers = self.getWorkers() 67 workers.sort() 68 return (managers, workers) 69 70 which = args[0] 71 if which not in ['manager', 'worker']: 72 raise errors.FatalError, 'Please specify either manager or worker' 73 74 if len(args) < 2: 75 raise errors.FatalError, 'Please specify which %s to %s' % ( 76 which, command) 77 78 name = args[1] 79 if which == 'manager': 80 managers = self.getManagers() 81 if not name in managers: 82 raise errors.FatalError, 'No manager "%s"' % name 83 managers = [name, ] 84 elif which == 'worker': 85 workers = self.getWorkers() 86 if not name in workers: 87 raise errors.FatalError, 'No worker with name %s' % name 88 workers = [name, ] 89 90 return (managers, workers)
91
92 - def _getDirOptions(self):
93 """ 94 Return a list of override directories for configure.configure 95 suitable for appending to a command line. 96 """ 97 args = [] 98 for key, value in self._overrideDir.items(): 99 if value: 100 args.append('--%s=%s' % (key, value)) 101 return " ".join(args)
102
103 - def getManagers(self):
104 """ 105 @returns: a dictionary of manager names -> flow names 106 """ 107 managers = {} 108 109 self.log('getManagers()') 110 if not os.path.exists(self.managersDir): 111 return managers 112 113 for managerDir in glob.glob(os.path.join(self.managersDir, '*')): 114 flows = [] # names of flows 115 # find flow files 116 flowsDir = os.path.join(managerDir, 'flows') 117 if os.path.exists(flowsDir): 118 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml')) 119 for flowFile in flowFiles: 120 filename = os.path.split(flowFile)[1] 121 name = filename.split(".xml")[0] 122 flows.append(name) 123 managerName = os.path.split(managerDir)[1] 124 self.log('Adding flows %r to manager %s' % (flows, managerName)) 125 managers[managerName] = flows 126 self.log('returning managers: %r' % managers) 127 return managers
128
129 - def getWorkers(self):
130 """ 131 @returns: a list of worker names 132 """ 133 workers = [] 134 135 if not os.path.exists(self.workersDir): 136 return workers 137 138 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')): 139 filename = os.path.split(workerFile)[1] 140 name = filename.split(".xml")[0] 141 workers.append(name) 142 workers.sort() 143 return workers
144
145 - def start(self, args):
146 """ 147 Start processes as given in the args. 148 149 If nothing specified, start all managers and workers. 150 If first argument is "manager", start given manager. 151 If first argument is "worker", start given worker. 152 153 @returns: an exit value reflecting the number of processes that failed 154 to start 155 """ 156 (managers, workers) = self._parseManagersWorkers('start', args) 157 self.debug("Start managers %r and workers %r" % (managers, workers)) 158 managersDict = self.getManagers() 159 exitvalue = 0 160 161 for name in managers: 162 if not self.startManager(name, managersDict[name]): 163 exitvalue += 1 164 for name in workers: 165 if not self.startWorker(name): 166 exitvalue += 1 167 168 return exitvalue
169
170 - def stop(self, args):
171 """ 172 Stop processes as given in the args. 173 174 If nothing specified, stop all managers and workers. 175 If first argument is "manager", stop given manager. 176 If first argument is "worker", stop given worker. 177 178 @returns: an exit value reflecting the number of processes that failed 179 to stop 180 """ 181 (managers, workers) = self._parseManagersWorkers('stop', args) 182 self.debug("Stop managers %r and workers %r" % (managers, workers)) 183 184 exitvalue = 0 185 186 for name in workers: 187 if not self.stopWorker(name): 188 exitvalue += 1 189 for name in managers: 190 if not self.stopManager(name): 191 exitvalue += 1 192 193 return exitvalue
194
195 - def status(self, args):
196 """ 197 Give status on processes as given in the args. 198 """ 199 (managers, workers) = self._parseManagersWorkers('status', args) 200 self.debug("Status managers %r and workers %r" % (managers, workers)) 201 for kind, names in [('manager', managers), ('worker', workers)]: 202 for name in names: 203 pid = getPid(kind, name) 204 if not pid: 205 print "%s %s not running" % (kind, name) 206 continue 207 if checkPidRunning(pid): 208 print "%s %s is running with pid %d" % (kind, name, pid) 209 else: 210 print "%s %s dead (stale pid %d)" % (kind, name, pid)
211
212 - def clean(self, args):
213 """ 214 Clean up dead process pid files as given in the args. 215 """ 216 (managers, workers) = self._parseManagersWorkers('clean', args) 217 self.debug("Clean managers %r and workers %r" % (managers, workers)) 218 for kind, names in [('manager', managers), ('worker', workers)]: 219 for name in names: 220 pid = getPid(kind, name) 221 if not pid: 222 # may be a file that contains bogus data 223 try: 224 deletePidFile(kind, name) 225 print "deleted bogus pid file for %s %s" % (kind, name) 226 except OSError: 227 print ("failed to delete pid file for %s %s " 228 "- ignoring" % (kind, name)) 229 continue 230 if not checkPidRunning(pid): 231 self.debug("Cleaning up stale pid %d for %s %s" % ( 232 pid, kind, name)) 233 print "deleting stale pid file for %s %s" % (kind, name) 234 deletePidFile(kind, name)
235
236 - def condrestart(self, args):
237 """ 238 Restart running processes as given in the args. 239 240 If nothing specified, condrestart all managers and workers. 241 If first argument is "manager", condrestart given manager. 242 If first argument is "worker", condrestart given worker. 243 244 @returns: an exit value reflecting the number of processes that failed 245 to start 246 """ 247 (managers, workers) = self._parseManagersWorkers('condrestart', args) 248 self.debug("condrestart managers %r and workers %r" % ( 249 managers, workers)) 250 managersDict = self.getManagers() 251 exitvalue = 0 252 253 for kind, names in [('manager', managers), ('worker', workers)]: 254 for name in names: 255 pid = getPid(kind, name) 256 if not pid: 257 continue 258 if checkPidRunning(pid): 259 if kind == 'manager': 260 if not self.stopManager(name): 261 exitvalue += 1 262 continue 263 if not self.startManager(name, managersDict[name]): 264 exitvalue += 1 265 elif kind == 'worker': 266 if not self.stopWorker(name): 267 exitvalue += 1 268 continue 269 if not self.startWorker(name): 270 exitvalue += 1 271 else: 272 print "%s %s dead (stale pid %d)" % (kind, name, pid) 273 274 return exitvalue
275
276 - def create(self, args):
277 # TODO: Andy suggested we should be able to customize the 278 # configuration this generates. 279 # For that we maybe first want to use the Command class way of 280 # writing the service script. 281 """ 282 Create a default manager or worker config. 283 """ 284 if len(args) == 0: 285 raise errors.FatalError, \ 286 "Please specify 'manager' or 'worker' to create." 287 kind = args[0] 288 if len(args) == 1: 289 raise errors.FatalError, \ 290 "Please specify name of %s to create." % kind 291 name = args[1] 292 293 port = 7531 294 if len(args) == 3: 295 port = int(args[2]) 296 297 if kind == 'manager': 298 self.createManager(name, port) 299 elif kind == 'worker': 300 self.createWorker(name, managerPort=port, randomFeederports=True) 301 else: 302 raise errors.FatalError, \ 303 "Please specify 'manager' or 'worker' to create."
304
305 - def createManager(self, name, port=7531):
306 """ 307 Create a sample manager. 308 309 @returns: whether or not the config was created. 310 """ 311 self.info("Creating manager %s" % name) 312 managerDir = os.path.join(self.managersDir, name) 313 if os.path.exists(managerDir): 314 raise errors.FatalError, \ 315 "Manager directory %s already exists" % managerDir 316 makedirs(managerDir) 317 318 planetFile = os.path.join(managerDir, 'planet.xml') 319 320 # create a default.pem file if it doesn't exist yet 321 pemFile = os.path.join(configure.configdir, 'default.pem') 322 if not os.path.exists(pemFile): 323 # files in datadir are usually not executable, so call through sh 324 retval = os.system("sh %s %s" % ( 325 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile)) 326 327 # If we couldn't generate the file, it means that we probably 328 # don't have openssl installed. If so, don't include the complete 329 # to the pemfile which means that the the default pem file which 330 # is shipped with flumotion will be used instead. 331 if retval != 0: 332 pemFile = 'default.pem' 333 334 # generate the file 335 handle = open(planetFile, 'w') 336 handle.write("""<planet> 337 <manager> 338 <debug>4</debug> 339 <host>localhost</host> 340 <port>%(port)d</port> 341 <transport>ssl</transport> 342 <!-- certificate path can be relative to $sysconfdir/flumotion, 343 or absolute --> 344 <certificate>%(pemFile)s</certificate> 345 <component name="manager-bouncer" type="htpasswdcrypt-bouncer"> 346 <property name="data"><![CDATA[ 347 user:PSfNpHTkpTx1M 348 ]]></property> 349 </component> 350 </manager> 351 </planet> 352 """ % locals()) 353 handle.close() 354 355 return True
356
357 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
358 """ 359 Create a sample worker. 360 361 @returns: whether or not the config was created. 362 """ 363 makedirs(self.workersDir) 364 self.info("Creating worker %s" % name) 365 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 366 if os.path.exists(workerFile): 367 raise errors.FatalError, \ 368 "Worker file %s already exists." % workerFile 369 370 feederports = " <!-- <feederports>8600-8639</feederports> -->" 371 if randomFeederports: 372 feederports = ' <feederports random="True" />' 373 # generate the file 374 handle = open(workerFile, 'w') 375 handle.write("""<worker> 376 377 <debug>4</debug> 378 379 <manager> 380 <host>localhost</host> 381 <port>%(managerPort)s</port> 382 </manager> 383 384 <authentication type="plaintext"> 385 <username>user</username> 386 <password>test</password> 387 </authentication> 388 389 %(feederports)s 390 391 </worker> 392 """ % locals()) 393 handle.close() 394 395 return True
396
397 - def startManager(self, name, flowNames):
398 """ 399 Start the manager as configured in the manager directory for the given 400 manager name, together with the given flows. 401 402 @returns: whether or not the manager daemon started 403 """ 404 self.info("Starting manager %s" % name) 405 self.debug("Starting manager with flows %r" % flowNames) 406 managerDir = os.path.join(self.managersDir, name) 407 planetFile = os.path.join(managerDir, 'planet.xml') 408 if not os.path.exists(planetFile): 409 raise errors.FatalError, \ 410 "Planet file %s does not exist" % planetFile 411 self.info("Loading planet %s" % planetFile) 412 413 flowsDir = os.path.join(managerDir, 'flows') 414 flowFiles = [] 415 for flowName in flowNames: 416 flowFile = os.path.join(flowsDir, "%s.xml" % flowName) 417 if not os.path.exists(flowFile): 418 raise errors.FatalError, \ 419 "Flow file %s does not exist" % flowFile 420 flowFiles.append(flowFile) 421 self.info("Loading flow %s" % flowFile) 422 423 pid = getPid('manager', name) 424 if pid: 425 if checkPidRunning(pid): 426 raise errors.FatalError, \ 427 "Manager %s is already running (with pid %d)" % (name, pid) 428 else: 429 raise errors.FatalError, \ 430 "Manager %s is dead (stale pid %d)" % (name, pid) 431 432 dirOptions = self._getDirOptions() 433 command = "flumotion-manager %s -D --daemonize-to %s " \ 434 "--service-name %s %s %s" % ( 435 dirOptions, configure.daemondir, name, planetFile, 436 " ".join(flowFiles)) 437 self.debug("starting process %s" % command) 438 retval = self.startProcess(command) 439 440 if retval == 0: 441 self.debug("Waiting for pid for manager %s" % name) 442 pid = waitPidFile('manager', name) 443 if pid: 444 self.info("Started manager %s with pid %d" % (name, pid)) 445 return True 446 else: 447 self.warning("manager %s could not start" % name) 448 return False 449 450 self.warning("manager %s could not start (return value %d)" % ( 451 name, retval)) 452 return False
453
454 - def startWorker(self, name):
455 """ 456 Start the worker as configured in the worker directory for the given 457 worker name. 458 459 @returns: whether or not the worker daemon started 460 """ 461 self.info("Starting worker %s" % name) 462 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 463 if not os.path.exists(workerFile): 464 raise errors.FatalError, \ 465 "Worker file %s does not exist" % workerFile 466 467 pid = getPid('worker', name) 468 if pid: 469 if checkPidRunning(pid): 470 raise errors.FatalError, \ 471 "Worker %s is already running (with pid %d)" % (name, pid) 472 else: 473 raise errors.FatalError, \ 474 "Worker %s is dead (stale pid %d)" % (name, pid) 475 476 # we are sure the worker is not running and there's no pid file 477 self.info("Loading worker %s" % workerFile) 478 479 dirOptions = self._getDirOptions() 480 command = "flumotion-worker %s -D --daemonize-to %s " \ 481 "--service-name %s %s" % ( 482 dirOptions, configure.daemondir, name, workerFile) 483 self.debug("Running %s" % command) 484 retval = self.startProcess(command) 485 486 if retval == 0: 487 self.debug("Waiting for pid for worker %s" % name) 488 pid = waitPidFile('worker', name) 489 if pid: 490 self.info("Started worker %s with pid %d" % (name, pid)) 491 return True 492 else: 493 self.warning("worker %s could not start" % name) 494 return False 495 496 self.warning("worker %s could not start (return value %d)" % ( 497 name, retval)) 498 return False
499
500 - def startProcess(self, command):
501 """ 502 Start the given process and block. 503 Returns the exit status of the process, or -1 in case of another error. 504 """ 505 status = os.system(command) 506 if os.WIFEXITED(status): 507 retval = os.WEXITSTATUS(status) 508 return retval 509 510 # definately something wrong 511 return -1
512
513 - def stopManager(self, name):
514 """ 515 Stop the given manager if it is running. 516 """ 517 self.info("Stopping manager %s" % name) 518 pid = getPid('manager', name) 519 if not pid: 520 return True 521 522 # FIXME: ensure a correct process is running this pid 523 if not checkPidRunning(pid): 524 self.info("Manager %s is dead (stale pid %d)" % (name, pid)) 525 return False 526 527 self.debug('Stopping manager %s with pid %d' % (name, pid)) 528 if not self.stopProcess(pid): 529 return False 530 531 self.info('Stopped manager %s with pid %d' % (name, pid)) 532 return True
533
534 - def stopWorker(self, name):
535 """ 536 Stop the given worker if it is running. 537 """ 538 self.info("Stopping worker %s" % name) 539 pid = getPid('worker', name) 540 if not pid: 541 self.info("worker %s was not running" % name) 542 return True 543 544 # FIXME: ensure a correct process is running this pid 545 if not checkPidRunning(pid): 546 self.info("Worker %s is dead (stale pid %d)" % (name, pid)) 547 return False 548 549 self.debug('Stopping worker %s with pid %d' % (name, pid)) 550 if not self.stopProcess(pid): 551 return False 552 553 self.info('Stopped worker %s with pid %d' % (name, pid)) 554 return True
555
556 - def stopProcess(self, pid):
557 """ 558 Stop the process with the given pid. 559 Wait until the pid has disappeared. 560 """ 561 startClock = time.clock() 562 termClock = startClock + configure.processTermWait 563 killClock = termClock + configure.processKillWait 564 565 self.debug('stopping process with pid %d' % pid) 566 if not termPid(pid): 567 self.warning('No process with pid %d' % pid) 568 return False 569 570 # wait for the kill 571 while (checkPidRunning(pid)): 572 if time.clock() > termClock: 573 self.warning("Process with pid %d has not responded to TERM " \ 574 "for %d seconds, killing" % (pid, 575 configure.processTermWait)) 576 killPid(pid) 577 # so it does not get triggered again 578 termClock = killClock + 1.0 579 580 if time.clock() > killClock: 581 self.warning("Process with pid %d has not responded to KILL " \ 582 "for %d seconds, stopping" % (pid, 583 configure.processKillWait)) 584 return False 585 586 # busy loop until kill is done 587 588 return True
589
590 - def list(self):
591 """ 592 List all service parts managed. 593 """ 594 managers = self.getManagers() 595 for name in managers.keys(): 596 flows = managers[name] 597 print "manager %s" % name 598 if flows: 599 for flow in flows: 600 print " flow %s" % flow 601 602 workers = self.getWorkers() 603 for worker in workers: 604 print "worker %s" % worker
605