Source code for backend.daemons.dispatcher

from datetime import datetime
import json
import os
import time
import gzip
import shutil
import multiprocessing
from setproctitle import setproctitle

from ..vm_manage.manager import VmManager
from ..exceptions import MockRemoteError, CoprWorkerError, VmError, NoVmAvailable
from ..job import BuildJob
from ..mockremote import MockRemote
from ..constants import BuildStatus, JOB_GRAB_TASK_END_PUBSUB, build_log_format
from ..helpers import register_build_result, get_redis_connection, get_redis_logger, \
    local_file_logger
from .. import jobgrabcontrol


# ansible_playbook = "ansible-playbook"

try:
    import fedmsg
except ImportError:
    # fedmsg is optional
    fedmsg = None


[docs]class Worker(multiprocessing.Process): """ Worker process dispatches building tasks. Backend spin-up multiple workers, each worker associated to one group_id and process one task at the each moment. :param Munch opts: backend config :param int worker_num: worker number :param int group_id: group_id from the set of groups defined in config """ def __init__(self, opts, frontend_client, worker_num, group_id): # base class initialization multiprocessing.Process.__init__(self, name="worker-builder") self.opts = opts self.worker_num = worker_num self.group_id = group_id self.log = get_redis_logger(self.opts, self.logger_name, "worker") self.jg = jobgrabcontrol.Channel(self.opts, self.log) # event queue for communicating back to dispatcher self.kill_received = False self.frontend_client = frontend_client self.vm_name = None self.vm_ip = None self.rc = None self.vmm = VmManager(self.opts) @property
[docs] def logger_name(self): return "backend.worker-{}-{}".format(self.group_name, self.worker_num)
@property
[docs] def group_name(self): try: return self.opts.build_groups[self.group_id]["name"] except Exception as error: self.log.exception("Failed to get builder group name from config, using group_id as name." "Original error: {}".format(error)) return str(self.group_id)
[docs] def fedmsg_notify(self, topic, template, content=None): """ Publish message to fedmsg bus when it is available :param topic: :param template: :param content: """ if self.opts.fedmsg_enabled and fedmsg: who = "worker-{0}".format(self.worker_num) content = content or {} content["who"] = who content["what"] = template.format(**content) try: fedmsg.publish(modname="copr", topic=topic, msg=content) # pylint: disable=W0703 except Exception as e: self.log.exception("failed to publish message: {0}".format(e))
[docs] def _announce_start(self, job): """ Announce everywhere that a build process started now. """ job.started_on = time.time() self.mark_started(job) template = "build start: user:{user} copr:{copr}" \ "pkg: {pkg} build:{build} ip:{ip} pid:{pid}" content = dict(user=job.submitter, copr=job.project_name, owner=job.project_owner, pkg=job.package_name, build=job.build_id, ip=self.vm_ip, pid=self.pid) self.fedmsg_notify("build.start", template, content) template = "chroot start: chroot:{chroot} user:{user}" \ "copr:{copr} pkg: {pkg} build:{build} ip:{ip} pid:{pid}" content = dict(chroot=job.chroot, user=job.submitter, owner=job.project_owner, pkg=job.package_name, copr=job.project_name, build=job.build_id, ip=self.vm_ip, pid=self.pid) self.fedmsg_notify("chroot.start", template, content)
[docs] def _announce_end(self, job): """ Announce everywhere that a build process ended now. """ job.ended_on = time.time() self.return_results(job) self.log.info("worker finished build: {0}".format(self.vm_ip)) template = "build end: user:{user} copr:{copr} build:{build}" \ " pkg: {pkg} version: {version} ip:{ip} pid:{pid} status:{status}" content = dict(user=job.submitter, copr=job.project_name, owner=job.project_owner, pkg=job.package_name, version=job.package_version, build=job.build_id, ip=self.vm_ip, pid=self.pid, status=job.status, chroot=job.chroot) self.fedmsg_notify("build.end", template, content)
[docs] def mark_started(self, job): """ Send data about started build to the frontend """ job.status = BuildStatus.RUNNING build = job.to_dict() self.log.info("starting build: {}".format(build)) data = {"builds": [build]} try: self.frontend_client.update(data) except: raise CoprWorkerError( "Could not communicate to front end to submit status info")
[docs] def return_results(self, job): """ Send the build results to the frontend """ self.log.info("Build {} finished with status {}. Took {} seconds" .format(job.build_id, job.status, job.ended_on - job.started_on)) data = {"builds": [job.to_dict()]} try: self.frontend_client.update(data) except Exception as err: raise CoprWorkerError( "Could not communicate to front end to submit results: {}" .format(err) )
[docs] def starting_build(self, job): """ Announce to the frontend that a build is starting. Checks if we can and/or should start job :return True: if the build can start :return False: if the build can not start (build is cancelled) """ try: return self.frontend_client.starting_build(job.build_id, job.chroot) except Exception as err: msg = "Could not communicate to front end to confirm build start" self.log.exception(msg) raise CoprWorkerError(msg)
@classmethod
[docs] def pkg_built_before(cls, pkg, chroot, destdir): """ Check whether the package has already been built in this chroot. """ s_pkg = os.path.basename(pkg) pdn = s_pkg.replace(".src.rpm", "") resdir = "{0}/{1}/{2}".format(destdir, chroot, pdn) resdir = os.path.normpath(resdir) if os.path.exists(resdir) and os.path.exists(os.path.join(resdir, "success")): return True return False
[docs] def init_fedmsg(self): """ Initialize Fedmsg (this assumes there are certs and a fedmsg config on disk) """ if not (self.opts.fedmsg_enabled and fedmsg): return try: fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True) except Exception as e: self.log.exception("Failed to initialize fedmsg: {}".format(e)) # TODO: doing skip logic on fronted during @start_build query # def on_pkg_skip(self, job): # """ # Handle package skip # """ # self._announce_start(job) # self.log.info("Skipping: package {} has been already built before.".format(job.pkg)) # job.status = BuildStatus.SKIPPED # self.notify_job_grab_about_task_end(job) # self._announce_end(job)
[docs] def obtain_job(self): """ Retrieves new build task from queue. Checks if the new job can be started and not skipped. """ # ToDo: remove retask, use redis lua fsm logic similiar to VMM # this sometimes caused TypeError in random worker # when another one picekd up a task to build # why? # praiskup: not reproduced try: task = self.jg.get_build(self.group_id) except TypeError as err: self.log.warning(err) return if not task: return job = BuildJob(task, self.opts) self.update_process_title(suffix="Task: {} chroot: {}, obtained at {}" .format(job.build_id, job.chroot, str(datetime.now()))) return job
[docs] def do_job(self, job): """ Executes new job. :param job: :py:class:`~backend.job.BuildJob` """ self._announce_start(job) self.update_process_title(suffix="Task: {} chroot: {} build started" .format(job.build_id, job.chroot)) status = BuildStatus.SUCCEEDED # setup our target dir locally if not os.path.exists(job.chroot_dir): try: os.makedirs(job.chroot_dir) except (OSError, IOError): self.log.exception("Could not make results dir for job: {}" .format(job.chroot_dir)) status = BuildStatus.FAILURE self.clean_result_directory(job) if status == BuildStatus.SUCCEEDED: # FIXME # need a plugin hook or some mechanism to check random # info about the pkgs # this should use ansible to download the pkg on # the remote system # and run a series of checks on the package before we # start the build - most importantly license checks. self.log.info("Starting build: id={} builder={} job: {}" .format(job.build_id, self.vm_ip, job)) with local_file_logger( "{}.builder.mr".format(self.logger_name), job.chroot_log_path, fmt=build_log_format) as build_logger: try: mr = MockRemote( builder_host=self.vm_ip, job=job, logger=build_logger, opts=self.opts ) mr.check() build_details = mr.build_pkg_and_process_results() job.update(build_details) if self.opts.do_sign: mr.add_pubkey() register_build_result(self.opts) except MockRemoteError as e: # record and break self.log.exception( "Error during the build, host={}, build_id={}, chroot={}, error: {}" .format(self.vm_ip, job.build_id, job.chroot, e) ) status = BuildStatus.FAILURE register_build_result(self.opts, failed=True) self.log.info( "Finished build: id={} builder={} timeout={} destdir={}" " chroot={} repos={}" .format(job.build_id, self.vm_ip, job.timeout, job.destdir, job.chroot, str(job.repos))) self.copy_mock_logs(job) job.status = status self._announce_end(job) self.update_process_title(suffix="Task: {} chroot: {} done" .format(job.build_id, job.chroot))
[docs] def copy_mock_logs(self, job): if not os.path.isdir(job.results_dir): self.log.info("Job results dir doesn't exists, couldn't copy main log; path: {}" .format(job.results_dir)) return log_names = [(job.chroot_log_name, "mockchain.log.gz"), (job.rsync_log_name, "rsync.log.gz")] for src_name, dst_name in log_names: src = os.path.join(job.chroot_dir, src_name) dst = os.path.join(job.results_dir, dst_name) try: with open(src, "rb") as f_src, gzip.open(dst, "wb") as f_dst: f_dst.writelines(f_src) except IOError: self.log.info("File {} not found".format(src))
[docs] def clean_result_directory(self, job): """ Create backup directory and move there results from previous build. """ if not os.path.exists(job.results_dir) or os.listdir(job.results_dir) == []: return backup_dir_name = "prev_build_backup" backup_dir = os.path.join(job.results_dir, backup_dir_name) self.log.info("Cleaning target directory, results from previous build storing in {}" .format(backup_dir)) if not os.path.exists(backup_dir): os.makedirs(backup_dir) files = (x for x in os.listdir(job.results_dir) if x != backup_dir_name) for filename in files: file_path = os.path.join(job.results_dir, filename) if os.path.isfile(file_path): if file_path.endswith((".info", ".log", ".log.gz")): os.rename(file_path, os.path.join(backup_dir, filename)) elif not file_path.endswith(".rpm"): os.remove(file_path) else: shutil.rmtree(file_path)
[docs] def update_process_title(self, suffix=None): title = "worker-{} {} ".format(self.group_name, self.worker_num) if self.vm_ip: title += "VM_IP={} ".format(self.vm_ip) if self.vm_name: title += "VM_NAME={} ".format(self.vm_name) if suffix: title += str(suffix) setproctitle(title)
[docs] def notify_job_grab_about_task_end(self, job, do_reschedule=False): # TODO: Current notification method is unreliable, # we should retask and use redis + lua for atomic acquire/release tasks request = { "action": "reschedule" if do_reschedule else "remove", "build_id": job.build_id, "task_id": job.task_id, "chroot": job.chroot, } self.rc.publish(JOB_GRAB_TASK_END_PUBSUB, json.dumps(request))
[docs] def acquire_vm_for_job(self, job): # TODO: replace acquire/release with context manager self.log.info("got job: {}, acquiring VM for build".format(str(job))) start_vm_wait_time = time.time() vmd = None while vmd is None: try: self.update_process_title(suffix="trying to acquire VM for job {} for {}s" .format(job.task_id, int(time.time() - start_vm_wait_time))) vmd = self.vmm.acquire_vm(self.group_id, job.project_owner, os.getpid(), job.task_id, job.build_id, job.chroot) except NoVmAvailable as error: self.log.info("No VM yet: {}".format(error)) time.sleep(self.opts.sleeptime) continue except Exception as error: self.log.exception("Unhandled exception during VM acquire :{}".format(error)) break return vmd
[docs] def run_cycle(self): self.update_process_title(suffix="trying to acquire job") time.sleep(self.opts.sleeptime) job = self.obtain_job() if not job: return try: if not self.starting_build(job): self.notify_job_grab_about_task_end(job) return except Exception: self.log.exception("Failed to check if job can be started") self.notify_job_grab_about_task_end(job) return vmd = self.acquire_vm_for_job(job) if vmd is None: self.notify_job_grab_about_task_end(job, do_reschedule=True) else: self.log.info("acquired VM: {} ip: {} for build {}".format(vmd.vm_name, vmd.vm_ip, job.task_id)) # TODO: store self.vmd = vmd and use it self.vm_name = vmd.vm_name self.vm_ip = vmd.vm_ip try: self.do_job(job) self.notify_job_grab_about_task_end(job) except VmError as error: self.log.exception("Builder error, re-scheduling task: {}".format(error)) self.notify_job_grab_about_task_end(job, do_reschedule=True) except Exception as error: self.log.exception("Unhandled build error: {}".format(error)) self.notify_job_grab_about_task_end(job, do_reschedule=True) finally: # clean up the instance self.vmm.release_vm(vmd.vm_name) self.vm_ip = None self.vm_name = None
[docs] def run(self): self.log.info("Starting worker") self.init_fedmsg() self.vmm.post_init() self.rc = get_redis_connection(self.opts) self.update_process_title(suffix="trying to acquire job") while not self.kill_received: self.run_cycle()