Source code for fedora_messaging.twisted.factory

# This file is part of fedora_messaging.
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Twisted Factory to create the Fedora Messaging Twisted Protocol instance.

See https://twistedmatrix.com/documents/current/core/howto/clients.html#clientfactory
"""

from __future__ import absolute_import

import logging

import pika
from twisted.internet import defer, protocol, error

# twisted.logger is available with Twisted 15+
from twisted.python import log

from ..exceptions import PublishReturned, ConnectionException
from .protocol import FedoraMessagingProtocol


[docs]class FedoraMessagingFactory(protocol.ReconnectingClientFactory): """Reconnecting factory for the Fedora Messaging protocol.""" name = u"FedoraMessaging:Factory" protocol = FedoraMessagingProtocol def __init__(self, parameters, bindings): """Initialize the protocol. Args: parameters (pika.ConnectionParameters): The connection parameters. bindings (list of dict): which bindings to setup on connect. """ self.bindings = bindings self._parameters = parameters self._message_callback = None self.client = None self._client_ready = defer.Deferred()
[docs] def startedConnecting(self, connector): """Called when the connection to the broker has started. See the documentation of `twisted.internet.protocol.ReconnectingClientFactory` for details. """ log.msg( "Connecting to the Fedora Messaging broker", system=self.name, logLevel=logging.DEBUG, )
[docs] def buildProtocol(self, addr): """Create the Protocol instance. See the documentation of `twisted.internet.protocol.ReconnectingClientFactory` for details. """ self.resetDelay() log.msg("Connected to the Fedora Messaging broker", system=self.name) self.client = self.protocol(self._parameters) self.client.factory = self self.client.ready.addCallback(lambda _: self._on_client_ready()) return self.client
@defer.inlineCallbacks def _on_client_ready(self): """Called when the client is ready to send and receive messages.""" # Setup read (on connect and reconnect). if self._message_callback is not None: yield self.client.setupRead(self._message_callback) yield self.client.resumeProducing() # Run ready callbacks. self._client_ready.callback(None)
[docs] def clientConnectionLost(self, connector, reason): """Called when the connection to the broker has been lost. See the documentation of `twisted.internet.protocol.ReconnectingClientFactory` for details. """ if not isinstance(reason.value, error.ConnectionDone): log.msg( "Lost connection. Reason: {}".format(reason.value), system=self.name, logLevel=logging.WARNING, ) if self._client_ready.called: # Renew the ready deferred, it will callback when the # next connection is ready. self._client_ready = defer.Deferred() protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
[docs] def clientConnectionFailed(self, connector, reason): """Called when the client has failed to connect to the broker. See the documentation of `twisted.internet.protocol.ReconnectingClientFactory` for details. """ log.msg( "Connection failed. Reason: {}".format(reason.value), system=self.name, logLevel=logging.WARNING, ) protocol.ReconnectingClientFactory.clientConnectionFailed( self, connector, reason )
[docs] def stopTrying(self): """Stop trying to reconnect to the broker. See the documentation of `twisted.internet.protocol.ReconnectingClientFactory` for details. """ protocol.ReconnectingClientFactory.stopTrying(self) if not self._client_ready.called: self._client_ready.errback( pika.exceptions.AMQPConnectionError( u"Could not connect, reconnection cancelled." ) )
[docs] @defer.inlineCallbacks def stopFactory(self): """Stop the factory. See the documentation of `twisted.internet.protocol.ReconnectingClientFactory` for details. """ if self.client: yield self.client.stopProducing() protocol.ReconnectingClientFactory.stopFactory(self)
[docs] @defer.inlineCallbacks def consume(self, message_callback): """Pass incoming messages to the provided callback. Args: message_callback (callable): The callable to pass the message to when one arrives. """ log.msg("Setup messages consumption.", system=self.name, logLevel=logging.DEBUG) new_setup = self._message_callback is None self._message_callback = message_callback if self._client_ready.called and new_setup: # If consume() is called after the client is ready (and we did # not setup before), do it now. yield self.client.setupRead(self._message_callback) yield self.client.resumeProducing()
[docs] @defer.inlineCallbacks def publish(self, message, exchange=None): """ Publish a :class:`fedora_messaging.message.Message` to an `exchange`_ on the message broker. Args: message (message.Message): The message to publish. exchange (str): The name of the AMQP exchange to publish to; defaults to :ref:`conf-publish-exchange` Raises: PublishReturned: If the published message is rejected by the broker. ConnectionException: If a connection error occurs while publishing. .. _exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges """ yield self._client_ready try: yield self.client.publish(message, exchange) except (pika.exceptions.ConnectionClosed, pika.exceptions.ChannelClosed) as e: log.msg( "Connection lost while publishing, retrying.", system=self.name, logLevel=logging.WARNING, ) yield self.publish(message, exchange) except (pika.exceptions.NackError, pika.exceptions.UnroutableError) as e: log.msg( "Message was rejected by the broker ({})".format(e), system=self.name, logLevel=logging.WARNING, ) raise PublishReturned(reason=e) except pika.exceptions.AMQPError as e: self.stopTrying() yield self.client.close() raise ConnectionException(reason=e)