adapters

Note

The following class level documentation is not intended for use by those using Pika in their applications. This documentation is for those who are extending Pika or otherwise working on the driver itself. For an overview of how to use adapters, please reference the Connecting to RabbitMQ documentation.

base_connection

Base class extended by connection adapters. This extends the connection.Connection class to encapsulate connection behavior but still isolate socket and low level communication.

BaseConnection

class pika.adapters.base_connection.BaseConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]

BaseConnection class that should be extended by connection adapters

BaseConnection.READ = 1
BaseConnection.WRITE = 4
BaseConnection.ERROR = 8
BaseConnection.ERRORS_TO_IGNORE = [11, 11, 4]
BaseConnection.DO_HANDSHAKE = True
BaseConnection.WARN_ABOUT_IOLOOP = False
BaseConnection.add_timeout(deadline, callback_method)[source]

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

BaseConnection.close(reply_code=200, reply_text='Normal shutdown')[source]

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
BaseConnection.remove_timeout(timeout_id)[source]

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str
BaseConnection._adapter_connect()[source]

Connect to the RabbitMQ broker

BaseConnection._adapter_disconnect()[source]

Invoked if the connection is being told to disconnect

BaseConnection._check_state_on_disconnect()[source]

Checks to see if we were in opening a connection with RabbitMQ when we were disconnected and raises exceptions for the anticipated exception types.

BaseConnection._create_and_connect_to_socket()[source]

Create socket and connect to it, using SSL if enabled.

BaseConnection._do_ssl_handshake()[source]

Perform SSL handshaking, copied from python stdlib test_ssl.py.

BaseConnection._get_error_code(error_value)[source]

Get the error code from the error_value accounting for Python version differences.

Return type:int
BaseConnection._flush_outbound()[source]

Call the state manager who will figure out that we need to write.

BaseConnection._handle_disconnect()[source]

Called internally when the socket is disconnected already

BaseConnection._handle_ioloop_stop()[source]

Invoked when the connection is closed to determine if the IOLoop should be stopped or not.

BaseConnection._handle_error(error_value)[source]

Internal error handling method. Here we expect a socket.error coming in and will handle different socket errors differently.

Parameters:error_value (int|object) – The inbound error
BaseConnection._handle_events(fd, events, error=None, write_only=False)[source]

Handle IO/Event loop events, processing them.

Parameters:
  • fd (int) – The file descriptor for the events
  • events (int) – Events from the IO/Event loop
  • error (int) – Was an error specified
  • write_only (bool) – Only handle write events
BaseConnection._handle_read()[source]

Read from the socket and call our on_data_available with the data.

BaseConnection._handle_write()[source]

Handle any outbound buffer writes that need to take place.

BaseConnection._init_connection_state()[source]

Initialize or reset all of our internal state variables for a given connection. If we disconnect and reconnect, all of our state needs to be wiped.

BaseConnection._manage_event_state()[source]

Manage the bitmask for reading/writing/error which is used by the io/event handler to specify when there is an event such as a read or write.

BaseConnection._wrap_socket(sock)[source]

Wrap the socket for connecting over SSL.

Return type:ssl.SSLSocket

asyncore_connection

Use pika with the stdlib asyncore module

AsyncoreConnection

class pika.adapters.asyncore_connection.AsyncoreConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]
AsyncoreConnection._adapter_connect()[source]

Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting Pika’s suggested buffer size for socket reading and writing. We pass the handle to self so that the AsyncoreDispatcher object can call back into our various state methods.

PikaDispatcher

class pika.adapters.asyncore_connection.PikaDispatcher(sock=None, map=None, event_callback=None)[source]
PikaDispatcher.READ = 1
PikaDispatcher.WRITE = 4
PikaDispatcher.ERROR = 8
PikaDispatcher.add_timeout(deadline, handler)[source]

Add a timeout with with given deadline, should return a timeout id.

Parameters:
  • deadline (int) – The number of seconds to wait until calling handler
  • handler (method) – The method to call at deadline
Return type:

str

PikaDispatcher.readable()[source]
PikaDispatcher.writable()[source]
PikaDispatcher.handle_read()[source]
PikaDispatcher.handle_write()[source]
PikaDispatcher.process_timeouts()[source]

Process the self._timeouts event stack

PikaDispatcher.remove_timeout(timeout_id)[source]

Remove a timeout if it’s still in the timeout stack

Parameters:timeout_id (str) – The timeout id to remove
PikaDispatcher.start()[source]
PikaDispatcher.stop()[source]
PikaDispatcher.update_handler(fileno_unused, events)[source]

Set the events to the current events

Parameters:
  • fileno_unused (int) – The file descriptor
  • events (int) – The event mask

blocking_connection

Implement a blocking, procedural style connection adapter on top of the asynchronous core.

BlockingConnection

class pika.adapters.blocking_connection.BlockingConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]

The BlockingConnection adapter is meant for simple implementations where you want to have blocking behavior. The behavior layered on top of the async library. Because of the nature of AMQP there are a few callbacks one needs to do, even in a blocking implementation. These include receiving messages from Basic.Deliver, Basic.GetOk, and Basic.Return.

BlockingConnection.WRITE_TO_READ_RATIO = 1000
BlockingConnection.DO_HANDSHAKE = True
BlockingConnection.SLEEP_DURATION = 0.1
BlockingConnection.SOCKET_CONNECT_TIMEOUT = 0.25
BlockingConnection.SOCKET_TIMEOUT_THRESHOLD = 12
BlockingConnection.SOCKET_TIMEOUT_CLOSE_THRESHOLD = 3
BlockingConnection.SOCKET_TIMEOUT_MESSAGE = 'Timeout exceeded, disconnected'
BlockingConnection.add_timeout(deadline, callback)[source]

Add the callback to the IOLoop timer to fire after deadline seconds.

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback (method) – The callback method
Return type:

str

BlockingConnection.channel(channel_number=None)[source]

Create a new channel with the next available or specified channel #.

Parameters:channel_number (int) – Specify the channel number
BlockingConnection.close(reply_code=200, reply_text='Normal shutdown')[source]

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
BlockingConnection.disconnect()[source]

Disconnect from the socket

BlockingConnection.process_data_events()[source]

Will make sure that data events are processed. Your app can block on this method.

BlockingConnection.process_timeouts()[source]

Process the self._timeouts event stack

BlockingConnection.remove_timeout(timeout_id)[source]

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Parameters:timeout_id (str) – The id of the timeout to remove
BlockingConnection.send_method(channel_number, method_frame, content=None)[source]

Constructs a RPC method frame and then sends it to the broker.

Parameters:
  • channel_number (int) – The channel number for the frame
  • method_frame (pika.object.Method) – The method frame to send
  • content (tuple) – If set, is a content frame, is tuple of properties and body.
BlockingConnection.sleep(duration)[source]

A safer way to sleep than calling time.sleep() directly which will keep the adapter from ignoring frames sent from RabbitMQ. The connection will “sleep” or block the number of seconds specified in duration in small intervals.

Parameters:duration (int) – The time to sleep
BlockingConnection._adapter_connect()[source]

Connect to the RabbitMQ broker

BlockingConnection._adapter_disconnect()[source]

Called if the connection is being requested to disconnect.

BlockingConnection._call_timeout_method(timeout_value)[source]

Execute the method that was scheduled to be called.

Parameters:timeout_value (dict) – The configuration for the timeout
BlockingConnection._deadline_passed(timeout_id)[source]

Returns True if the deadline has passed for the specified timeout_id.

Parameters:timeout_id (str) – The id of the timeout to check
Return type:bool
BlockingConnection._handle_disconnect()[source]

Called internally when the socket is disconnected already

BlockingConnection._handle_read()[source]

If the ReadPoller says there is data to read, try adn read it in the _handle_read of the parent class. Once read, reset the counter that keeps track of how many frames have been written since the last read.

BlockingConnection._handle_timeout()[source]

Invoked whenever the socket times out

BlockingConnection._flush_outbound()[source]

Flush the outbound socket buffer.

BlockingConnection._on_connection_closed(method_frame, from_adapter=False)[source]

Called when the connection is closed remotely. The from_adapter value will be true if the connection adapter has been disconnected from the broker and the method was invoked directly instead of by receiving a Connection.Close frame.

Parameters:
  • pika.frame.Method – The Connection.Close frame
  • from_adapter (bool) – Called by the connection adapter
Raises :

AMQPConnectionError

BlockingConnection._send_frame(frame_value)[source]

This appends the fully generated frame to send to the broker to the output buffer which will be then sent via the connection adapter.

Parameters:frame_value (pika.frame.Frame|pika.frame.ProtocolHeader) – The frame to write

select_connection

A connection adapter that tries to use the best polling method for the platform pika is running on.

SelectConnection

class pika.adapters.select_connection.SelectConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]

An asynchronous connection adapter that attempts to use the fastest event loop adapter for the given platform.

SelectConnection._adapter_connect()[source]

Connect to the RabbitMQ broker

SelectConnection._flush_outbound()[source]

Call the state manager who will figure out that we need to write then call the poller’s poll function to force it to process events.

IOLoop

class pika.adapters.select_connection.IOLoop(state_manager)[source]

Singlton wrapper that decides which type of poller to use, creates an instance of it in start_poller and keeps the invoking application in a blocking state by calling the pollers start method. Poller should keep looping until IOLoop.instance().stop() is called or there is a socket error.

Also provides a convenient pass-through for add_timeout and set_events

add_timeout(deadline, handler)[source]

Add a timeout with with given deadline, should return a timeout id.

Pass through a deadline and handler to the active poller.

Parameters:
  • deadline (int) – The number of seconds to wait until calling handler
  • handler (method) – The method to call at deadline
Return type:

int

poller_type[source]

Return the type of poller.

Return type:str
remove_timeout(timeout_id)[source]

Remove a timeout if it’s still in the timeout stack of the poller

Parameters:timeout_id (str) – The timeout id to remove
start()[source]

Start the IOLoop, waiting for a Poller to take over.

start_poller(handler, events, fileno)[source]

Start the Poller, once started will take over for IOLoop.start()

Parameters:
  • handler (method) – The method to call to handle events
  • events (int) – The events to handle
  • fileno (int) – The file descriptor to poll for
stop()[source]

Stop the poller’s event loop

update_handler(fileno, events)[source]

Pass in the events to process for the given file descriptor.

Parameters:
  • fileno (int) – The file descriptor to poll for
  • events (int) – The events to handle

SelectPoller

class pika.adapters.select_connection.SelectPoller(fileno, handler, events, state_manager)[source]

Default behavior is to use Select since it’s the widest supported and has all of the methods we need for child classes as well. One should only need to override the update_handler and start methods for additional types.

TIMEOUT = 1
add_timeout(deadline, handler)[source]

Add a timeout with with given deadline, should return a timeout id.

Parameters:
  • deadline (int) – The number of seconds to wait until calling handler
  • handler (method) – The method to call at deadline
Return type:

str

flush_pending_timeouts()[source]
poll(write_only=False)[source]

Check to see if the events that are cared about have fired.

Parameters:write_only (bool) – Don’t look at self.events, just look to see if the adapter can write.
process_timeouts()[source]

Process the self._timeouts event stack

remove_timeout(timeout_id)[source]

Remove a timeout if it’s still in the timeout stack

Parameters:timeout_id (str) – The timeout id to remove
start()[source]

Start the main poller loop. It will loop here until self.closed

update_handler(fileno, events)[source]

Set the events to the current events

Parameters:
  • fileno (int) – The file descriptor
  • events (int) – The event mask

KQueuePoller

class pika.adapters.select_connection.KQueuePoller(fileno, handler, events, state_manager)[source]

KQueuePoller works on BSD based systems and is faster than select

update_handler(fileno, events)[source]

Set the events to the current events

Parameters:
  • fileno (int) – The file descriptor
  • events (int) – The event mask
start()[source]

Start the main poller loop. It will loop here until self.closed

poll(write_only=False)[source]

Check to see if the events that are cared about have fired.

Parameters:write_only (bool) – Don’t look at self.events, just look to see if the adapter can write.
TIMEOUT = 1
add_timeout(deadline, handler)

Add a timeout with with given deadline, should return a timeout id.

Parameters:
  • deadline (int) – The number of seconds to wait until calling handler
  • handler (method) – The method to call at deadline
Return type:

str

flush_pending_timeouts()
process_timeouts()

Process the self._timeouts event stack

remove_timeout(timeout_id)

Remove a timeout if it’s still in the timeout stack

Parameters:timeout_id (str) – The timeout id to remove

PollPoller

class pika.adapters.select_connection.PollPoller(fileno, handler, events, state_manager)[source]

Poll works on Linux and can have better performance than EPoll in certain scenarios. Both are faster than select.

update_handler(fileno, events)[source]

Set the events to the current events

Parameters:
  • fileno (int) – The file descriptor
  • events (int) – The event mask
start()[source]

Start the main poller loop. It will loop here until self.closed

poll(write_only=False)[source]

Poll until TIMEOUT waiting for an event

Parameters:write_only (bool) – Only process write events
TIMEOUT = 1
add_timeout(deadline, handler)

Add a timeout with with given deadline, should return a timeout id.

Parameters:
  • deadline (int) – The number of seconds to wait until calling handler
  • handler (method) – The method to call at deadline
Return type:

str

flush_pending_timeouts()
process_timeouts()

Process the self._timeouts event stack

remove_timeout(timeout_id)

Remove a timeout if it’s still in the timeout stack

Parameters:timeout_id (str) – The timeout id to remove

EPollPoller

class pika.adapters.select_connection.EPollPoller(fileno, handler, events, state_manager)[source]

EPoll works on Linux and can have better performance than Poll in certain scenarios. Both are faster than select.

poll(write_only=False)[source]

Poll until TIMEOUT waiting for an event

Parameters:write_only (bool) – Only process write events
TIMEOUT = 1
add_timeout(deadline, handler)

Add a timeout with with given deadline, should return a timeout id.

Parameters:
  • deadline (int) – The number of seconds to wait until calling handler
  • handler (method) – The method to call at deadline
Return type:

str

flush_pending_timeouts()
process_timeouts()

Process the self._timeouts event stack

remove_timeout(timeout_id)

Remove a timeout if it’s still in the timeout stack

Parameters:timeout_id (str) – The timeout id to remove
start()

Start the main poller loop. It will loop here until self.closed

update_handler(fileno, events)

Set the events to the current events

Parameters:
  • fileno (int) – The file descriptor
  • events (int) – The event mask

tornado_connection

Run pika on the Tornado IOLoop

TornadoConnection

class pika.adapters.tornado_connection.TornadoConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=False, custom_ioloop=None)[source]

The TornadoConnection runs on the Tornado IOLoop. If you’re running the connection in a web app, make sure you set stop_ioloop_on_close to False, which is the default behavior for this adapter, otherwise the web app will stop taking requests.

TornadoConnection.WARN_ABOUT_IOLOOP = True
TornadoConnection._adapter_connect()[source]

Connect to the RabbitMQ broker

TornadoConnection._adapter_disconnect()[source]

Disconnect from the RabbitMQ broker

TornadoConnection.add_timeout(deadline, callback_method)[source]

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called.

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

TornadoConnection.remove_timeout(timeout_id)[source]

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str