Pika uses connection adapters to provide a flexible method for adapting pika’s
core communication to different IOLoop implementations. In addition to asynchronous adapters, there is the
BlockingConnection adapter that provides a more idiomatic procedural approach to using Pika.
Requesting message acknowledgements from another thread¶
The single-threaded usage constraint of an individual Pika connection adapter instance may result in a dropped AMQP/stream connection due to AMQP heartbeat timeout in consumers that take a long time to process an incoming message. A common solution is to delegate processing of the incoming messages to another thread, while the connection adapter’s thread continues to service its I/O loop’s message pump, permitting AMQP heartbeats and other I/O to be serviced in a timely fashion.
Messages processed in another thread may not be acknowledged directly from that thread, since all accesses to the connection adapter instance must be from a single thread, which is the thread running the adapter’s I/O loop. This is accomplished by requesting a callback to be executed in the adapter’s I/O loop thread. For example, the callback function’s implementation might look like this:
def ack_message(channel, delivery_tag): """Note that `channel` must be the same Pika channel instance via which the message being acknowledged was retrieved (AMQP protocol constraint). """ if channel.is_open: channel.basic_ack(delivery_tag) else: # Channel is already closed, so we can't acknowledge this message; # log and/or do something that makes sense for your app in this case. pass
The code running in the other thread may request the
to be executed in the connection adapter’s I/O loop thread using an
pika.BlockingConnectionabstracts its I/O loop from the application and thus exposes
pika.BlockingConnection.add_callback_threadsafe(). Refer to this method’s docstring for additional information. For example:
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
Please see the documentation of other adapters for their specific methods.
This threadsafe callback request mechanism may also be used to delegate publishing of messages, etc., from a background thread to the connection adapter’s thread.
The Pika library requires connection recovery to be performed by the application code and strive to make it a straightforward process. Pika falls into the second category.
Different connection adapters take different approaches to connection recovery.
pika.BlockingConnection adapter exception handling can be used to check
for connection errors. Here is a very basic example:
import pika while True: try: connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume('test', on_message_callback) channel.start_consuming() # Don't recover if connection was closed by broker except pika.exceptions.ConnectionClosedByBroker: break # Don't recover on channel errors except pika.exceptions.AMQPChannelError: break # Recover on all other connection errors except pika.exceptions.AMQPConnectionError: continue
This example can be found in examples/consume_recover.py.
Generic operation retry libraries such as retry can be used. Decorators make it possible to configure some additional recovery behaviours, like delays between retries and limiting the number of retries:
from retry import retry @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) def consume(): connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume('test', on_message_callback) try: channel.start_consuming() # Don't recover connections closed by server except pika.exceptions.ConnectionClosedByBroker: pass consume()
This example can be found in examples/consume_recover_retry.py.
For asynchronous adapters, use
on_close_callback to react to connection
failure events. This callback can be used to clean up and recover the
An example of recovery using
on_close_callback can be found in