Connecting to RabbitMQ

Pika provides multiple adapters to connect to RabbitMQ allowing for different ways of providing socket communication depending on what is appropriate for your application.

  • SelectConnection: A native event based connection adapter that implements select, kqueue, poll and epoll.
  • AsyncoreConnection: Legacy adapter kept for convenience of previous Pika users.
  • TornadoConnection: Connection adapter for use with the Tornado IO Loop.
  • BlockingConnection: Enables blocking, synchronous operation on top of library for simple uses.

IO and Event Looping

Due to the need to check for and send content on a consistent basis, Pika now implements or extends IOLoops in each of its asynchronous connection adapters. These IOLoops are blocking methods which loop and listen for events. Each asynchronous adapters follows the same standard for invoking the IOLoop. The IOLoop is created when the connection adapter is created. To start it simply call the connection.ioloop.start() method.

If you are using an external IOLoop such as Tornado’s IOLoop, you may invoke that as you normally would and then add the adapter to it.

Example:

from pika.adapters import SelectConnection

# Create our connection object
connection = SelectConnection()

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Continuation-Passing Style

Interfacing with Pika asynchronously is done by passing in callback methods you would like to have invoked when a certain event has completed. For example, if you are going to declare a queue, you pass in a method that will be called when the RabbitMQ server returns a Queue.DeclareOk response.

In our example below we use the following four easy steps:

  1. We start by creating our connection object, then starting our event loop.
  2. When we are connected, the on_connected method is called. In that method we create a channel.
  3. When the channel is created, the on_channel_open method is called. In that method we declare a queue.
  4. When the queue is declared successfully, on_queue_declared is called. In that method we call channel.basic_consume telling it to call the handle_delivery for each message RabbitMQ delivers to us.
  5. When RabbitMQ has a message to send us, it call the handle_delivery method passing the AMQP Method frame, Header frame and Body.

Note

Step #1 is on line #28 and Step #2 is on line #6. This is so that Python knows about the functions we’ll call in Steps #2 through #5.

Example:

import pika

# Create a global channel variable to hold our channel object in
channel = None

# Step #2
def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    # Open a channel
    connection.channel(on_channel_open)

# Step #3
def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)

# Step #4
def on_queue_declared(frame):
    """Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
    channel.basic_consume(handle_delivery, queue='test')

# Step #5
def handle_delivery(channel, method, header, body):
    """Called when we receive a message from RabbitMQ"""
    print body

# Step #1: Connect to RabbitMQ using the default parameters
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_connected)

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Credentials

The credentials module provides the mechanism by which you pass the username and password to the connection.ConnectionParameters() class when it is created.

class pika.credentials.PlainCredentials(username, password, erase_on_connect=False)[source]

The PlainCredentials class returns the properly formatted username and password to the Connection. As of this version of Pika, only PlainCredentials are supported. To authenticate with Pika, simply create a credentials object passing in the username and password and pass that to the ConnectionParameters object.

If you do not pass in credentials to the ConnectionParameters object, it will create credentials for ‘guest’ with the password of ‘guest’.

If you pass True to erase_on_connect the credentials will not be stored in memory after the Connection attempt has been made.

response_for(start)[source]

Validate that this type of authentication is supported

Parameters:start (spec.Connection.Start) – Connection.Start method
Return type:tuple(str|None, str|None)
erase_credentials()[source]

Called by Connection when it no longer needs the credentials

Example:

import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(credentials=credentials)

Connection Parameters

There are two types of connection parameter classes in Pika to allow you to pass the connection information into a connection adapter, ConnectionParameters and URLParameters. Both classes share the same default connection values.

Default Parameter Values

The connection parameters classes extend pika.connection.Parameters to create a consistent definition of default values and internal attributes.

pika.connection.Parameters = <class 'pika.connection.Parameters'>[source]

Base connection parameters class definition

Parameters:
  • DEFAULT_HOST (str) – ‘localhost’
  • DEFAULT_PORT (int) – 5672
  • DEFAULT_VIRTUAL_HOST (str) – ‘/’
  • DEFAULT_USERNAME (str) – ‘guest’
  • DEFAULT_PASSWORD (str) – ‘guest’
  • DEFAULT_HEARTBEAT_INTERVAL (int) – 0
  • DEFAULT_CHANNEL_MAX (int) – 0
  • DEFAULT_FRAME_MAX (int) – pika.spec.FRAME_MAX_SIZE
  • DEFAULT_LOCALE (str) – ‘en_US’
  • DEFAULT_CONNECTION_ATTEMPTS (int) – 1
  • DEFAULT_RETRY_DELAY (int|float) – 2.0
  • DEFAULT_SOCKET_TIMEOUT (int|float) – 0.25
  • DEFAULT_SSL (bool) – False
  • DEFAULT_SSL_OPTIONS (dict) – {}
  • DEFAULT_SSL_PORT (int) – 5671
  • DEFAULT_BACKPRESSURE_DETECTION (bool) – False

ConnectionParameters

The ConnectionParameters class allows you to specify the options needed when creating the object.

class pika.connection.ConnectionParameters(host=None, port=None, virtual_host=None, credentials=None, channel_max=None, frame_max=None, heartbeat_interval=None, ssl=None, ssl_options=None, connection_attempts=None, retry_delay=None, socket_timeout=None, locale=None, backpressure_detection=None)[source]

Connection parameters object that is passed into the connection adapter upon construction.

Example:

import pika

# Set the connection parameters to connect to rabbit-server1 on port 5672
# on the / virtual host using the username "guest" and password "guest"
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('rabbit-server1',
                                       5672
                                       '/',
                                       credentials)

URLParameters

The URLParameters class allows you to pass in an AMQP URL when creating the object and supports the host, port, virtual host, ssl, username and password in the base URL and other options are passed in via query parameters.

class pika.connection.URLParameters(url)[source]

Create a Connection parameters object based off of URIParameters

Example:

import pika

# Set the connection parameters to connect to rabbit-server1 on port 5672
# on the / virtual host using the username "guest" and password "guest"
parameters = pika.URLParameters('amqp://guest:guest@rabbit-server1:5672/%2F')

TCP Backpressure

As of RabbitMQ 2.0, client side Channel.Flow has been removed [1]. Instead, the RabbitMQ broker uses TCP Backpressure to slow your client if it is delivering messages too fast. If you pass in backpressure_detection into your connection parameters, Pika attempts to help you handle this situation by providing a mechanism by which you may be notified if Pika has noticed too many frames have yet to be delivered. By registering a callback function with the add_backpressure_callback method of any connection adapter, your function will be called when Pika sees that a backlog of 10 times the average frame size you have been sending has been exceeded. You may tweak the notification multiplier value by calling the set_backpressure_multiplier method passing any integer value.

Example:

import pika

parameters = pika.URLParameters('amqp://guest:guest@rabbit-server1:5672/%2F?backpressure_detection=t')

Available Adapters

The following connection adapters are available for connecting with RabbitMQ:

AsyncoreConnection

Note

Use It is recommended that you use SelectConnection and its method signatures are the same as AsyncoreConnection.

The AsyncoreConnection class is provided for legacy support and quicker porting from applications that used Pika version 0.5.2 and prior.

class pika.adapters.asyncore_connection.AsyncoreConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]
add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_on_close_callback(callback_method)

Add a callback notification when the connection has closed.

Parameters:callback_method (method) – The callback when the channel is opened
add_on_open_callback(callback_method)

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – The callback when the channel is opened
add_timeout(deadline, callback_method)

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(on_open_callback, channel_number=None)

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Parameters:
  • on_open_callback (method) – The callback when the channel is opened
  • channel_number (int) – The channel number to use, defaults to the next available.
close(reply_code=200, reply_text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
remove_timeout(timeout_id)

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set

BlockingConnection

The BlockingConnection creates a layer on top of Pika’s asynchronous core providng methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you are still required to implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using basic_consume or if you want to be notified of a delivery failure when using basic_publish.

Basic.Get is a blocking call which will either return the Method Frame, Header Frame and Body of a message, or it will return a Basic.GetEmpty frame as the Method Frame.

For more information on using the BlockingConnection, see BlockingChannel

Publishing Example:

from pika.adapters import BlockingConnection
from pika import BasicProperties

# Open a connection to RabbitMQ on localhost using all default parameters
connection = BlockingConnection()

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)

# Send a message
channel.basic_publish(exchange='',
                      routing_key="test",
                      body="Hello World!",
                      properties=BasicProperties(content_type="text/plain",
                                                 delivery_mode=1))

Consuming Example:

from pika.adapters import BlockingConnection

# Open a connection to RabbitMQ on localhost using all default parameters
connection = BlockingConnection()

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue="test", durable=True,
                      exclusive=False, auto_delete=False)

# Start our counter at 0
messages = 0

# Method that will receive our messages and stop consuming after 10
def _on_message(channel, method, header, body):
    print "Message:"
    print "\t%r" % method
    print "\t%r" % header
    print "\t%r" % body

    # Acknowledge message receipt
    channel.basic_ack(method.delivery_tag)

    # We've received 10 messages, stop consuming
    global messages
    messages += 1
    if messages > 10:
        channel.stop_consuming()

# Setup up our consumer callback
channel.basic_consume(_on_message, queue="test")

# This is blocking until channel.stop_consuming is called and will allow us to receive messages
channel.start_consuming()

Implement a blocking, procedural style connection adapter on top of the asynchronous core.

class pika.adapters.blocking_connection.BlockingConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]

The BlockingConnection adapter is meant for simple implementations where you want to have blocking behavior. The behavior layered on top of the async library. Because of the nature of AMQP there are a few callbacks one needs to do, even in a blocking implementation. These include receiving messages from Basic.Deliver, Basic.GetOk, and Basic.Return.

add_timeout(deadline, callback)[source]

Add the callback to the IOLoop timer to fire after deadline seconds.

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback (method) – The callback method
Return type:

str

channel(channel_number=None)[source]

Create a new channel with the next available or specified channel #.

Parameters:channel_number (int) – Specify the channel number
close(reply_code=200, reply_text='Normal shutdown')[source]

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
disconnect()[source]

Disconnect from the socket

process_data_events()[source]

Will make sure that data events are processed. Your app can block on this method.

process_timeouts()[source]

Process the self._timeouts event stack

remove_timeout(timeout_id)[source]

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Parameters:timeout_id (str) – The id of the timeout to remove
send_method(channel_number, method_frame, content=None)[source]

Constructs a RPC method frame and then sends it to the broker.

Parameters:
  • channel_number (int) – The channel number for the frame
  • method_frame (pika.object.Method) – The method frame to send
  • content (tuple) – If set, is a content frame, is tuple of properties and body.
sleep(duration)[source]

A safer way to sleep than calling time.sleep() directly which will keep the adapter from ignoring frames sent from RabbitMQ. The connection will “sleep” or block the number of seconds specified in duration in small intervals.

Parameters:duration (int) – The time to sleep
add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_on_close_callback(callback_method)

Add a callback notification when the connection has closed.

Parameters:callback_method (method) – The callback when the channel is opened
add_on_open_callback(callback_method)

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – The callback when the channel is opened
basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set

SelectConnection

Note

SelectConnection is the recommended method for using Pika under most circumstances. It supports multiple event notification methods including select, epoll, kqueue and poll.

By default SelectConnection will attempt to use the most appropriate event notification method for your system. In order to override the default behavior you may set the poller type by assigning a string value to the select_connection modules POLLER_TYPE attribute prior to creating the SelectConnection object instance. Valid values are: kqueue, poll, epoll, select

Poller Type Override Example:

import select_connection
select_connection.POLLER_TYPE = 'epoll'
connection = select_connection.SelectConnection()

See the Continuation-Passing Style example for an example of using SelectConnection.

A connection adapter that tries to use the best polling method for the platform pika is running on.

class pika.adapters.select_connection.SelectConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=True)[source]

An asynchronous connection adapter that attempts to use the fastest event loop adapter for the given platform.

add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_on_close_callback(callback_method)

Add a callback notification when the connection has closed.

Parameters:callback_method (method) – The callback when the channel is opened
add_on_open_callback(callback_method)

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – The callback when the channel is opened
add_timeout(deadline, callback_method)

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(on_open_callback, channel_number=None)

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Parameters:
  • on_open_callback (method) – The callback when the channel is opened
  • channel_number (int) – The channel number to use, defaults to the next available.
close(reply_code=200, reply_text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
remove_timeout(timeout_id)

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set

TornadoConnection

Tornado is an open source version of the scalable, non-blocking web server and tools that power FriendFeed. For more information on tornado, visit http://tornadoweb.org

Since the Tornado IOLoop blocks once it is started, it is suggested that you use a timer to add Pika to your tornado.Application instance after the HTTPServer has started.

The following is a simple, non-working example on how to add Pika to the Tornado IOLoop without blocking other applications from doing so. To see a fully workng example, see the Tornado Demo application in the examples.

Example:

from pika.adapters.tornado_connection import TornadoConnection

class PikaClient(object):
    def connect(self):
        self.connection = TornadoConnection(on_connected_callback=self.on_connected)

# Create our Tornado Application
application = tornado.web.Application([
    (r"/", ExampleHandler)
], **settings)

# Create our Pika Client
application.pika = PikaClient()

# Start the HTTPServer
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8080)

# Get a handle to the instance of IOLoop
ioloop = tornado.ioloop.IOLoop.instance()

# Add our Pika connect to the IOLoop since we loop on ioloop.start
ioloop.add_timeout(500, application.pika.connect)

# Start the IOLoop
ioloop.start()

Run pika on the Tornado IOLoop

class pika.adapters.tornado_connection.TornadoConnection(parameters=None, on_open_callback=None, stop_ioloop_on_close=False, custom_ioloop=None)[source]

The TornadoConnection runs on the Tornado IOLoop. If you’re running the connection in a web app, make sure you set stop_ioloop_on_close to False, which is the default behavior for this adapter, otherwise the web app will stop taking requests.

add_timeout(deadline, callback_method)[source]

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called.

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

remove_timeout(timeout_id)[source]

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Return type:str
add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_on_close_callback(callback_method)

Add a callback notification when the connection has closed.

Parameters:callback_method (method) – The callback when the channel is opened
add_on_open_callback(callback_method)

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – The callback when the channel is opened
basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(on_open_callback, channel_number=None)

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Parameters:
  • on_open_callback (method) – The callback when the channel is opened
  • channel_number (int) – The channel number to use, defaults to the next available.
close(reply_code=200, reply_text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set

Footnotes

[1]“more effective flow control mechanism that does not require cooperation from clients and reacts quickly to prevent the broker from exhausing memory - see http://www.rabbitmq.com/extensions.html#memsup” from http://lists.rabbitmq.com/pipermail/rabbitmq-announce/attachments/20100825/2c672695/attachment.txt