Source code for pika.adapters.twisted_connection

"""Using Pika with a Twisted reactor.

Supports two methods of establishing the connection, using TwistedConnection
or TwistedProtocolConnection. For details about each method, see the docstrings
of the corresponding classes.

The interfaces in this module are Deferred-based when possible. This means that
the connection.channel() method and most of the channel methods return
Deferreds instead of taking a callback argument and that basic_consume()
returns a Twisted DeferredQueue where messages from the server will be
stored. Refer to the docstrings for TwistedConnection.channel() and the
TwistedChannel class for details.

"""
import functools
from twisted.internet import defer, error, reactor
from twisted.python import log

from pika import connection
from pika import exceptions
from pika.adapters import base_connection


class ClosableDeferredQueue(defer.DeferredQueue):
    """
    Like the normal Twisted DeferredQueue, but after close() is called with an
    Exception instance all pending Deferreds are errbacked and further attempts
    to call get() or put() return a Failure wrapping that exception.
    """

    def __init__(self, size=None, backlog=None):
        self.closed = None
        super(ClosableDeferredQueue, self).__init__(size, backlog)

    def put(self, obj):
        if self.closed:
            return defer.fail(self.closed)
        return defer.DeferredQueue.put(self, obj)

    def get(self):
        if self.closed:
            return defer.fail(self.closed)
        return defer.DeferredQueue.get(self)

    def close(self, reason):
        self.closed = reason
        while self.waiting:
            self.waiting.pop().errback(reason)
        self.pending = []


[docs]class TwistedChannel(object): """A wrapper wround Pika's Channel. Channel methods that normally take a callback argument are wrapped to return a Deferred that fires with whatever would be passed to the callback. If the channel gets closed, all pending Deferreds are errbacked with a ChannelClosed exception. The returned Deferreds fire with whatever arguments the callback to the original method would receive. The basic_consume method is wrapped in a special way, see its docstring for details. """ WRAPPED_METHODS = ('exchange_declare', 'exchange_delete', 'queue_declare', 'queue_bind', 'queue_purge', 'queue_unbind', 'basic_qos', 'basic_get', 'basic_recover', 'tx_select', 'tx_commit', 'tx_rollback', 'flow', 'basic_cancel') def __init__(self, channel): self.__channel = channel self.__closed = None self.__calls = set() self.__consumers = {} channel.add_on_close_callback(self.channel_closed) def channel_closed(self, channel, reply_code, reply_text): # enter the closed state self.__closed = exceptions.ChannelClosed(reply_code, reply_text) # errback all pending calls for d in self.__calls: d.errback(self.__closed) # close all open queues for consumers in self.__consumers.values(): for c in consumers: c.close(self.__closed) # release references to stored objects self.__calls = set() self.__consumers = {}
[docs] def basic_consume(self, *args, **kwargs): """Consume from a server queue. Returns a Deferred that fires with a tuple: (queue_object, consumer_tag). The queue object is an instance of ClosableDeferredQueue, where data received from the queue will be stored. Clients should use its get() method to fetch individual message. """ if self.__closed: return defer.fail(self.__closed) queue = ClosableDeferredQueue() queue_name = kwargs['queue'] kwargs['consumer_callback'] = lambda *args: queue.put(args) self.__consumers.setdefault(queue_name, set()).add(queue) try: consumer_tag = self.__channel.basic_consume(*args, **kwargs) # TODO this except without types would suppress system-exiting # exceptions, such as SystemExit and KeyboardInterrupt. It should be at # least `except Exception` and preferably more specific. except: return defer.fail() return defer.succeed((queue, consumer_tag))
[docs] def queue_delete(self, *args, **kwargs): """Wraps the method the same way all the others are wrapped, but removes the reference to the queue object after it gets deleted on the server. """ wrapped = self.__wrap_channel_method('queue_delete') queue_name = kwargs['queue'] d = wrapped(*args, **kwargs) return d.addCallback(self.__clear_consumer, queue_name)
[docs] def basic_publish(self, *args, **kwargs): """Make sure the channel is not closed and then publish. Return a Deferred that fires with the result of the channel's basic_publish. """ if self.__closed: return defer.fail(self.__closed) return defer.succeed(self.__channel.basic_publish(*args, **kwargs))
def __wrap_channel_method(self, name): """Wrap Pika's Channel method to make it return a Deferred that fires when the method completes and errbacks if the channel gets closed. If the original method's callback would receive more than one argument, the Deferred fires with a tuple of argument values. """ method = getattr(self.__channel, name) @functools.wraps(method) def wrapped(*args, **kwargs): if self.__closed: return defer.fail(self.__closed) d = defer.Deferred() self.__calls.add(d) d.addCallback(self.__clear_call, d) def single_argument(*args): """ Make sure that the deferred is called with a single argument. In case the original callback fires with more than one, convert to a tuple. """ if len(args) > 1: d.callback(tuple(args)) else: d.callback(*args) kwargs['callback'] = single_argument try: method(*args, **kwargs) # TODO this except without types would suppress system-exiting # exceptions, such as SystemExit and KeyboardInterrupt. It should be # at least `except Exception` and preferably more specific. except: return defer.fail() return d return wrapped def __clear_consumer(self, ret, queue_name): self.__consumers.pop(queue_name, None) return ret def __clear_call(self, ret, d): self.__calls.discard(d) return ret def __getattr__(self, name): # Wrap methods defined in WRAPPED_METHODS, forward the rest of accesses # to the channel. if name in self.WRAPPED_METHODS: return self.__wrap_channel_method(name) return getattr(self.__channel, name)
class IOLoopReactorAdapter(object): """An adapter providing Pika's IOLoop interface using a Twisted reactor. Accepts a TwistedConnection object and a Twisted reactor object. """ def __init__(self, connection, reactor): self.connection = connection self.reactor = reactor self.started = False def add_timeout(self, deadline, callback_method): """Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout. Do not confuse with Tornado's timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it's to be called. :param int deadline: The number of seconds to wait to call callback :param method callback_method: The callback method :rtype: twisted.internet.interfaces.IDelayedCall """ return self.reactor.callLater(deadline, callback_method) def remove_timeout(self, call): """Remove a call :param twisted.internet.interfaces.IDelayedCall call: The call to cancel """ call.cancel() def add_callback_threadsafe(self, callback): """Requests a call to the given function as soon as possible in the context of this IOLoop's thread. NOTE: This is the only thread-safe method offered by the IOLoop adapter. All other manipulations of the IOLoop adapter and its parent connection must be performed from the connection's thread. For example, a thread may request a call to the `channel.basic_ack` method of a connection that is running in a different thread via ``` connection.add_callback_threadsafe( functools.partial(channel.basic_ack, delivery_tag=...)) ``` :param method callback: The callback method; must be callable. """ self.reactor.callFromThread(callback) def stop(self): # Guard against stopping the reactor multiple times if not self.started: return self.started = False self.reactor.stop() def start(self): # Guard against starting the reactor multiple times if self.started: return self.started = True self.reactor.run() def remove_handler(self, _): # The fileno is irrelevant, as it's the connection's job to provide it # to the reactor when asked to do so. Removing the handler from the # ioloop is removing it from the reactor in Twisted's parlance. self.reactor.removeReader(self.connection) self.reactor.removeWriter(self.connection) def update_handler(self, _, event_state): # Same as in remove_handler, the fileno is irrelevant. First remove the # connection entirely from the reactor, then add it back depending on # the event state. self.reactor.removeReader(self.connection) self.reactor.removeWriter(self.connection) if event_state & self.connection.READ: self.reactor.addReader(self.connection) if event_state & self.connection.WRITE: self.reactor.addWriter(self.connection)
[docs]class TwistedConnection(base_connection.BaseConnection): """A standard Pika connection adapter. You instantiate the class passing the connection parameters and the connected callback and when it gets called you can start using it. The problem is that connection establishing is done using the blocking socket module. For instance, if the host you are connecting to is behind a misconfigured firewall that just drops packets, the whole process will freeze until the connection timeout passes. To work around that problem, use TwistedProtocolConnection, but read its docstring first. Objects of this class get put in the Twisted reactor which will notify them when the socket connection becomes readable or writable, so apart from implementing the BaseConnection interface, they also provide Twisted's IReadWriteDescriptor interface. """ def __init__(self, parameters=None, on_open_callback=None, on_open_error_callback=None, on_close_callback=None, stop_ioloop_on_close=False): super(TwistedConnection, self).__init__( parameters=parameters, on_open_callback=on_open_callback, on_open_error_callback=on_open_error_callback, on_close_callback=on_close_callback, ioloop=IOLoopReactorAdapter(self, reactor), stop_ioloop_on_close=stop_ioloop_on_close) def _adapter_connect(self): """Connect to the RabbitMQ broker""" # Connect (blockignly!) to the server error = super(TwistedConnection, self)._adapter_connect() if not error: # Set the I/O events we're waiting for (see IOLoopReactorAdapter # docstrings for why it's OK to pass None as the file descriptor) self.ioloop.update_handler(None, self.event_state) return error def _adapter_disconnect(self): """Called when the adapter should disconnect""" self.ioloop.remove_handler(None) self._cleanup_socket() def _on_connected(self): """Call superclass and then update the event state to flush the outgoing frame out. Commit 50d842526d9f12d32ad9f3c4910ef60b8c301f59 removed a self._flush_outbound call that was in _send_frame which previously made this step unnecessary. """ super(TwistedConnection, self)._on_connected() self._manage_event_state()
[docs] def channel(self, channel_number=None): """Return a Deferred that fires with an instance of a wrapper around the Pika Channel class. """ d = defer.Deferred() base_connection.BaseConnection.channel(self, d.callback, channel_number) return d.addCallback(TwistedChannel)
# IReadWriteDescriptor methods def fileno(self): return self.socket.fileno() def logPrefix(self): return "twisted-pika" def connectionLost(self, reason): # If the connection was not closed cleanly, log the error if not reason.check(error.ConnectionDone): log.err(reason) self._on_terminate(connection.InternalCloseReasons.SOCKET_ERROR, str(reason)) def doRead(self): self._handle_read() def doWrite(self): self._handle_write() self._manage_event_state()
[docs]class TwistedProtocolConnection(base_connection.BaseConnection): """A hybrid between a Pika Connection and a Twisted Protocol. Allows using Twisted's non-blocking connectTCP/connectSSL methods for connecting to the server. It has one caveat: TwistedProtocolConnection objects have a ready instance variable that's a Deferred which fires when the connection is ready to be used (the initial AMQP handshaking has been done). You *have* to wait for this Deferred to fire before requesting a channel. Since it's Twisted handling connection establishing it does not accept connect callbacks, you have to implement that within Twisted. Also remember that the host, port and ssl values of the connection parameters are ignored because, yet again, it's Twisted who manages the connection. """ def __init__(self, parameters=None, on_close_callback=None): self.ready = defer.Deferred() super(TwistedProtocolConnection, self).__init__( parameters=parameters, on_open_callback=self.connectionReady, on_open_error_callback=self.connectionFailed, on_close_callback=on_close_callback, ioloop=IOLoopReactorAdapter(self, reactor), stop_ioloop_on_close=False)
[docs] def connect(self): # The connection is open asynchronously by Twisted, so skip the whole # connect() part, except for setting the connection state self._set_connection_state(self.CONNECTION_INIT)
def _adapter_connect(self): # Should never be called, as we override connect() and leave the # building of a TCP connection to Twisted, but implement anyway to keep # the interface return False def _adapter_disconnect(self): # Disconnect from the server self.transport.loseConnection() def _flush_outbound(self): """Override BaseConnection._flush_outbound to send all bufferred data the Twisted way, by writing to the transport. No need for buffering, Twisted handles that for us. """ while self.outbound_buffer: self.transport.write(self.outbound_buffer.popleft())
[docs] def channel(self, channel_number=None): """Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers. Return a Deferred that fires with an instance of a wrapper around the Pika Channel class. :param int channel_number: The channel number to use, defaults to the next available. """ d = defer.Deferred() base_connection.BaseConnection.channel(self, d.callback, channel_number) return d.addCallback(TwistedChannel)
# IProtocol methods def dataReceived(self, data): # Pass the bytes to Pika for parsing self._on_data_available(data) def connectionLost(self, reason): # Let the caller know there's been an error d, self.ready = self.ready, None if d: d.errback(reason) def makeConnection(self, transport): self.transport = transport self.connectionMade() def connectionMade(self): # Tell everyone we're connected self._on_connected() # Our own methods def connectionReady(self, res): d, self.ready = self.ready, None if d: d.callback(res) def connectionFailed(self, connection_unused, error_message=None): d, self.ready = self.ready, None if d: attempts = self.params.connection_attempts exc = exceptions.AMQPConnectionError(attempts) d.errback(exc)