Source code for fedora_messaging.twisted.service
# This file is part of fedora_messaging.
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Twisted Service to start and stop the Fedora Messaging Twisted Factory.
This Service makes it easier to build a Twisted application that embeds a
Fedora Messaging component. See the ``verify_missing`` service in
fedmsg-migration-tools for a use case.
See https://twistedmatrix.com/documents/current/core/howto/application.html
"""
from __future__ import absolute_import, unicode_literals
import pika
from twisted.application import service
from twisted.application.internet import TCPClient, SSLClient
from twisted.internet import ssl
from .. import config
from .._session import _configure_tls_parameters
from .factory import FedoraMessagingFactory
[docs]class FedoraMessagingService(service.MultiService):
"""A Twisted service to connect to the Fedora Messaging broker."""
name = "fedora-messaging"
factoryClass = FedoraMessagingFactory
def __init__(self, on_message, amqp_url=None, bindings=None):
"""Initialize the service.
Args:
on_message (callable|None): Callback that will be passed each
incoming messages. If None, no message consuming is setup.
amqp_url (str): URL to use for the AMQP server.
bindings (list(dict)): A list of dictionaries that define queue
bindings to exchanges. This parameter can be used to override
the bindings declared in the configuration. See the
configuration documentation for details.
"""
service.MultiService.__init__(self)
amqp_url = amqp_url or config.conf["amqp_url"]
self._parameters = pika.URLParameters(amqp_url)
if amqp_url.startswith("amqps"):
_configure_tls_parameters(self._parameters)
if self._parameters.client_properties is None:
self._parameters.client_properties = config.conf["client_properties"]
self._bindings = bindings or config.conf["bindings"]
self._on_message = on_message
def startService(self):
self.connect()
if self._on_message:
self.getFactory().consume(self._on_message)
service.MultiService.startService(self)
def stopService(self):
factory = self.getFactory()
if not factory:
return
factory.stopTrying()
service.MultiService.stopService(self)
def getFactory(self):
if self.services:
return self.services[0].factory
return None
def connect(self):
factory = self.factoryClass(self._parameters, self._bindings)
if self._parameters.ssl_options:
serv = SSLClient(
host=self._parameters.host,
port=self._parameters.port,
factory=factory,
contextFactory=_ssl_context_factory(self._parameters),
)
else:
serv = TCPClient(
host=self._parameters.host, port=self._parameters.port, factory=factory
)
serv.factory = factory
name = "{}{}:{}".format(
"ssl:" if self._parameters.ssl_options else "",
self._parameters.host,
self._parameters.port,
)
serv.setName(name)
serv.setServiceParent(self)
def _ssl_context_factory(parameters):
"""
Produce a Twisted SSL context object from a pika connection parameter object.
This is necessary as Twisted manages the connection, not Pika.
Args:
parameters (pika.ConnectionParameters): The connection parameters built
from the fedora_messaging configuration.
"""
client_cert = None
key = config.conf["tls"]["keyfile"]
cert = config.conf["tls"]["certfile"]
with open(config.conf["tls"]["ca_cert"]) as fd:
ca_cert = ssl.Certificate.loadPEM(fd.read())
if key and cert:
# Note that _configure_tls_parameters sets the auth mode to EXTERNAL
# if both key and cert are defined, so we don't need to do that here.
with open(key) as fd:
client_keypair = fd.read()
with open(cert) as fd:
client_keypair += fd.read()
client_cert = ssl.PrivateCertificate.loadPEM(client_keypair)
context_factory = ssl.optionsForClientTLS(
parameters.host,
trustRoot=ca_cert,
clientCertificate=client_cert,
extraCertificateOptions={"raiseMinimumTo": ssl.TLSVersion.TLSv1_2},
)
return context_factory