Twisted Connection Adapter

Using Pika with a Twisted reactor.

Supports two methods of establishing the connection, using TwistedConnection or TwistedProtocolConnection. For details about each method, see the docstrings of the corresponding classes.

The interfaces in this module are Deferred-based when possible. This means that the connection.channel() method and most of the channel methods return Deferreds instead of taking a callback argument and that basic_consume() returns a Twisted DeferredQueue where messages from the server will be stored. Refer to the docstrings for TwistedConnection.channel() and the TwistedChannel class for details.

class pika.adapters.twisted_connection.TwistedConnection(parameters=None, on_open_callback=None, on_open_error_callback=None, on_close_callback=None, stop_ioloop_on_close=False)[source]

A standard Pika connection adapter. You instantiate the class passing the connection parameters and the connected callback and when it gets called you can start using it.

The problem is that connection establishing is done using the blocking socket module. For instance, if the host you are connecting to is behind a misconfigured firewall that just drops packets, the whole process will freeze until the connection timeout passes. To work around that problem, use TwistedProtocolConnection, but read its docstring first.

Objects of this class get put in the Twisted reactor which will notify them when the socket connection becomes readable or writable, so apart from implementing the BaseConnection interface, they also provide Twisted’s IReadWriteDescriptor interface.

add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_callback_threadsafe(callback)

Requests a call to the given function as soon as possible in the context of this connection’s IOLoop thread.

NOTE: This is the only thread-safe method offered by the connection. All
other manipulations of the connection must be performed from the connection’s thread.

For example, a thread may request a call to the channel.basic_ack method of a connection that is running in a different thread via

``` connection.add_callback_threadsafe(

functools.partial(channel.basic_ack, delivery_tag=…))

```

Parameters:callback (method) – The callback method; must be callable.
add_on_close_callback(callback_method)

Add a callback notification when the connection has closed. The callback will be passed the connection, the reply_code (int) and the reply_text (str), if sent by the remote server.

Parameters:callback_method (method) – Callback to call on close
add_on_connection_blocked_callback(callback_method)

Add a callback to be notified when RabbitMQ has sent a Connection.Blocked frame indicating that RabbitMQ is low on resources. Publishers can use this to voluntarily suspend publishing, instead of relying on back pressure throttling. The callback will be passed the Connection.Blocked method frame.

See also ConnectionParameters.blocked_connection_timeout.

Parameters:callback_method (method) – Callback to call on Connection.Blocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Blocked
add_on_connection_unblocked_callback(callback_method)

Add a callback to be notified when RabbitMQ has sent a Connection.Unblocked frame letting publishers know it’s ok to start publishing again. The callback will be passed the Connection.Unblocked method frame.

Parameters:callback_method (method) – Callback to call on Connection.Unblocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Unblocked
add_on_open_callback(callback_method)

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – Callback to call when open
add_on_open_error_callback(callback_method, remove_default=True)

Add a callback notification when the connection can not be opened.

The callback method should accept the connection object that could not connect, and an optional error message.

Parameters:
  • callback_method (method) – Callback to call when can’t connect
  • remove_default (bool) – Remove default exception raising callback
add_timeout(deadline, callback_method)

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(channel_number=None)[source]

Return a Deferred that fires with an instance of a wrapper around the Pika Channel class.

close(reply_code=200, reply_text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
connect()

Invoke if trying to reconnect to a RabbitMQ server. Constructing the Connection object should connect on its own.

consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns True if connection is in the process of closing due to client-initiated close request, but closing is not yet complete.

is_open

Returns a boolean reporting the current connection state.

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
remove_timeout(timeout_id)

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set
class pika.adapters.twisted_connection.TwistedProtocolConnection(parameters=None, on_close_callback=None)[source]

A hybrid between a Pika Connection and a Twisted Protocol. Allows using Twisted’s non-blocking connectTCP/connectSSL methods for connecting to the server.

It has one caveat: TwistedProtocolConnection objects have a ready instance variable that’s a Deferred which fires when the connection is ready to be used (the initial AMQP handshaking has been done). You have to wait for this Deferred to fire before requesting a channel.

Since it’s Twisted handling connection establishing it does not accept connect callbacks, you have to implement that within Twisted. Also remember that the host, port and ssl values of the connection parameters are ignored because, yet again, it’s Twisted who manages the connection.

add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_callback_threadsafe(callback)

Requests a call to the given function as soon as possible in the context of this connection’s IOLoop thread.

NOTE: This is the only thread-safe method offered by the connection. All
other manipulations of the connection must be performed from the connection’s thread.

For example, a thread may request a call to the channel.basic_ack method of a connection that is running in a different thread via

``` connection.add_callback_threadsafe(

functools.partial(channel.basic_ack, delivery_tag=…))

```

Parameters:callback (method) – The callback method; must be callable.
add_on_close_callback(callback_method)

Add a callback notification when the connection has closed. The callback will be passed the connection, the reply_code (int) and the reply_text (str), if sent by the remote server.

Parameters:callback_method (method) – Callback to call on close
add_on_connection_blocked_callback(callback_method)

Add a callback to be notified when RabbitMQ has sent a Connection.Blocked frame indicating that RabbitMQ is low on resources. Publishers can use this to voluntarily suspend publishing, instead of relying on back pressure throttling. The callback will be passed the Connection.Blocked method frame.

See also ConnectionParameters.blocked_connection_timeout.

Parameters:callback_method (method) – Callback to call on Connection.Blocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Blocked
add_on_connection_unblocked_callback(callback_method)

Add a callback to be notified when RabbitMQ has sent a Connection.Unblocked frame letting publishers know it’s ok to start publishing again. The callback will be passed the Connection.Unblocked method frame.

Parameters:callback_method (method) – Callback to call on Connection.Unblocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Unblocked
add_on_open_callback(callback_method)

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – Callback to call when open
add_on_open_error_callback(callback_method, remove_default=True)

Add a callback notification when the connection can not be opened.

The callback method should accept the connection object that could not connect, and an optional error message.

Parameters:
  • callback_method (method) – Callback to call when can’t connect
  • remove_default (bool) – Remove default exception raising callback
add_timeout(deadline, callback_method)

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(channel_number=None)[source]

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Return a Deferred that fires with an instance of a wrapper around the Pika Channel class.

Parameters:channel_number (int) – The channel number to use, defaults to the next available.
close(reply_code=200, reply_text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
connect()[source]

Invoke if trying to reconnect to a RabbitMQ server. Constructing the Connection object should connect on its own.

consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns True if connection is in the process of closing due to client-initiated close request, but closing is not yet complete.

is_open

Returns a boolean reporting the current connection state.

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
remove_timeout(timeout_id)

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set
class pika.adapters.twisted_connection.TwistedChannel(channel)[source]

A wrapper wround Pika’s Channel.

Channel methods that normally take a callback argument are wrapped to return a Deferred that fires with whatever would be passed to the callback. If the channel gets closed, all pending Deferreds are errbacked with a ChannelClosed exception. The returned Deferreds fire with whatever arguments the callback to the original method would receive.

The basic_consume method is wrapped in a special way, see its docstring for details.

basic_consume(*args, **kwargs)[source]

Consume from a server queue. Returns a Deferred that fires with a tuple: (queue_object, consumer_tag). The queue object is an instance of ClosableDeferredQueue, where data received from the queue will be stored. Clients should use its get() method to fetch individual message.

basic_publish(*args, **kwargs)[source]

Make sure the channel is not closed and then publish. Return a Deferred that fires with the result of the channel’s basic_publish.

queue_delete(*args, **kwargs)[source]

Wraps the method the same way all the others are wrapped, but removes the reference to the queue object after it gets deleted on the server.