BlockingConnection

The blocking connection adapter module implements blocking semantics on top of Pika’s core AMQP driver. While most of the asynchronous expectations are removed when using the blocking connection adapter, it attempts to remain true to the asynchronous RPC nature of the AMQP protocol, supporting server sent RPC commands.

The user facing classes in the module consist of the BlockingConnection and the BlockingChannel classes.

Be sure to check out examples in Usage Examples.

class pika.adapters.blocking_connection.BlockingConnection(parameters=None)[source]

The BlockingConnection creates a layer on top of Pika’s asynchronous core providing methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you can still implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using basic_consume or if you want to be notified of a delivery failure when using basic_publish .

Basic.Get is a blocking call which will either return the Method Frame, Header Frame and Body of a message, or it will return a Basic.GetEmpty frame as the method frame.

For more information about communicating with the blocking_connection adapter, be sure to check out the BlockingChannel class which implements the Channel based communication for the blocking_connection adapter.

add_backpressure_callback(callback_method)

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_on_close_callback(callback_method_unused)[source]

This is not supported in BlockingConnection. When a connection is closed in BlockingConnection, a pika.exceptions.ConnectionClosed exception will raised instead.

Parameters:callback_method_unused (method) – Unused
Raises:NotImplementedError
add_on_open_callback(callback_method_unused)[source]

This method is not supported in BlockingConnection.

Parameters:callback_method_unused (method) – Unused
Raises:NotImplementedError
add_on_open_error_callback(callback_method_unused, remove_default=False)[source]

This method is not supported in BlockingConnection.

A pika.exceptions.AMQPConnectionError will be raised instead.

Parameters:callback_method_unused (method) – Unused
Raises:NotImplementedError
add_timeout(deadline, callback_method)[source]

Add the callback_method to the IOLoop timer to fire after deadline seconds. Returns a handle to the timeout. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called.

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
Return type:

str

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(channel_number=None)[source]

Create a new channel with the next available or specified channel #.

Parameters:channel_number (int) – Specify the channel number
close(reply_code=200, reply_text='Normal shutdown')[source]

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
connect()[source]

Invoke if trying to reconnect to a RabbitMQ server. Constructing the Connection object should connect on its own.

consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

process_data_events()[source]

Will make sure that data events are processed. Your app can block on this method.

process_timeouts()[source]

Process the self._timeouts event stack

publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
remove_timeout(timeout_id)[source]

Remove the timeout from the IOLoop by the ID returned from add_timeout.

Parameters:timeout_id (str) – The id of the timeout to remove
send_method(channel_number, method_frame, content=None)[source]

Constructs a RPC method frame and then sends it to the broker.

Parameters:
  • channel_number (int) – The channel number for the frame
  • method_frame (pika.object.Method) – The method frame to send
  • content (tuple) – If set, is a content frame, is tuple of properties and body.
set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set
sleep(duration)[source]

A safer way to sleep than calling time.sleep() directly which will keep the adapter from ignoring frames sent from RabbitMQ. The connection will “sleep” or block the number of seconds specified in duration in small intervals.

Parameters:duration (int) – The time to sleep
class pika.adapters.blocking_connection.BlockingChannel(connection, channel_number)[source]

The BlockingChannel implements blocking semantics for most things that one would use callback-passing-style for with the Channel class. In addition, the BlockingChannel class implements a generator that allows you to consume messages without using callbacks.

Example of creating a BlockingChannel:

import pika

# Create our connection object
connection = pika.BlockingConnection()

# The returned object will be a blocking channel
channel = connection.channel()
Parameters:
  • connection (BlockingConnection) – The connection
  • channel_number (int) – The channel number for this instance
add_callback(callback, replies, one_shot=True)

Pass in a callback handler and a list replies from the RabbitMQ broker which you’d like the callback notified of. Callbacks should allow for the frame parameter to be passed in.

Parameters:
  • callback (method) – The method to call
  • replies (list) – The replies to get a callback for
  • one_shot (bool) – Only handle the first type callback
add_on_cancel_callback(callback)

Pass a callback function that will be called when the basic_cancel is sent by the server. The callback function should receive a frame parameter.

Parameters:callback (method) – The method to call on callback
add_on_close_callback(callback)

Pass a callback function that will be called when the channel is closed. The callback function will receive the channel, the reply_code (int) and the reply_text (int) sent by the server describing why the channel was closed.

Parameters:callback (method) – The method to call on callback
add_on_flow_callback(callback)

Pass a callback function that will be called when Channel.Flow is called by the remote server. Note that newer versions of RabbitMQ will not issue this but instead use TCP backpressure

Parameters:callback (method) – The method to call on callback
add_on_return_callback(callback)

Pass a callback function that will be called when basic_publish as sent a message that has been rejected and returned by the server. The callback handler should receive a method, header and body frame. The base signature for the callback should be the same as the method signature one creates for a basic_consume callback.

Parameters:callback (method) – The method to call on callback
basic_ack(delivery_tag=0, multiple=False)

Acknowledge one or more messages. When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message.

Parameters:
  • delivery-tag (int) – The server-assigned delivery tag
  • multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
basic_cancel(consumer_tag='', nowait=False)[source]

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.

Parameters:
  • consumer_tag (str) – Identifier for the consumer
  • nowait (bool) – Do not expect a Basic.CancelOk response
basic_consume(consumer_callback, queue='', no_ack=False, exclusive=False, consumer_tag=None, arguments=None)

Sends the AMQP command Basic.Consume to the broker and binds messages for the consumer_tag to the consumer callback. If you do not pass in a consumer_tag, one will be automatically generated for you. Returns the consumer tag.

For more information on basic_consume, see: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume

Parameters:
  • consumer_callback (method) – The method to callback when consuming
  • queue (str or unicode) – The queue to consume from
  • no_ack (bool) – Tell the broker to not expect a response
  • exclusive (bool) – Don’t allow other consumers on the queue
  • consumer_tag (str or unicode) – Specify your own consumer tag
  • arguments (dict) – Custom key/value pair arguments for the consume
Return type:

str

basic_get(queue=None, no_ack=False)[source]

Get a single message from the AMQP broker. The callback method signature should have 3 parameters: The method frame, header frame and the body, like the consumer callback for Basic.Consume.

Parameters:
  • queue (str or unicode) – The queue to get a message from
  • no_ack (bool) – Tell the broker to not expect a reply
Return type:

(None, None, None)|(spec.Basic.Get, spec.Basic.Properties, str or unicode)

basic_nack(delivery_tag=None, multiple=False, requeue=True)

This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters:
  • delivery-tag (int) – The server-assigned delivery tag
  • multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
  • requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
basic_publish(exchange, routing_key, body, properties=None, mandatory=False, immediate=False)[source]

Publish to the channel with the given exchange, routing key and body. Returns a boolean value indicating the success of the operation. For more information on basic_publish and what the parameters do, see:

http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish

Parameters:
  • exchange (str or unicode) – The exchange to publish to
  • routing_key (str or unicode) – The routing key to bind on
  • body (str or unicode) – The message body
  • properties (pika.spec.Properties) – Basic.properties
  • mandatory (bool) – The mandatory flag
  • immediate (bool) – The immediate flag
basic_qos(prefetch_size=0, prefetch_count=0, all_channels=False)[source]

Specify quality of service. This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement.

Parameters:
  • prefetch_size (int) – This field specifies the prefetch window size. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.
  • prefetch_count (int) – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set.
  • all_channels (bool) – Should the QoS apply to all channels
basic_recover(requeue=False)[source]

This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.

Parameters:requeue (bool) – If False, the message will be redelivered to the original recipient. If True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
basic_reject(delivery_tag=None, requeue=True)

Reject an incoming message. This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters:
  • delivery-tag (int) – The server-assigned delivery tag
  • requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
cancel()[source]

Cancel the consumption of a queue, rejecting all pending messages. This should only work with the generator based BlockingChannel.consume method. If you’re looking to cancel a consumer issues with BlockingChannel.basic_consume then you should call BlockingChannel.basic_cancel.

Return int:The number of messages requeued by Basic.Nack
close(reply_code=0, reply_text='Normal Shutdown')[source]

Will invoke a clean shutdown of the channel with the AMQP Broker.

Parameters:
  • reply_code (int) – The reply code to close the channel with
  • reply_text (str) – The reply text to close the channel with
confirm_delivery(nowait=False)[source]

Turn on Confirm mode in the channel.

For more information see:
http://www.rabbitmq.com/extensions.html#confirms
Parameters:nowait (bool) – Do not send a reply frame (Confirm.SelectOk)
consume(queue, no_ack=False, exclusive=False)[source]

Blocking consumption of a queue instead of via a callback. This method is a generator that returns messages a tuple of method, properties, and body.

Example:

for method, properties, body in channel.consume(‘queue’):
print body channel.basic_ack(method.delivery_tag)

You should call BlockingChannel.cancel() when you escape out of the generator loop. Also note this turns on forced data events to make sure that any acked messages actually get acked.

Parameters:
  • queue (str or unicode) – The queue name to consume
  • no_ack (bool) – Tell the broker to not expect a response
  • exclusive (bool) – Don’t allow other consumers on the queue
Return type:

tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode)

consumer_tags

Property method that returns a list of currently active consumers

Return type:list
exchange_bind(destination=None, source=None, routing_key='', nowait=False, arguments=None)[source]

Bind an exchange to another exchange.

Parameters:
  • destination (str or unicode) – The destination exchange to bind
  • source (str or unicode) – The source exchange to bind to
  • routing_key (str or unicode) – The routing key to bind on
  • nowait (bool) – Do not wait for an Exchange.BindOk
  • arguments (dict) – Custom key/value pair arguments for the binding
exchange_declare(exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None, type=None)[source]

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

Parameters:
  • exchange (str or unicode) – The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
  • exchange_type (str) – The exchange type to use
  • passive (bool) – Perform a declare or just check to see if it exists
  • durable (bool) – Survive a reboot of RabbitMQ
  • auto_delete (bool) – Remove when no more queues are bound to it
  • internal (bool) – Can only be published to by other exchanges
  • nowait (bool) – Do not expect an Exchange.DeclareOk response
  • arguments (dict) – Custom key/value pair arguments for the exchange
  • type (str) – The deprecated exchange type parameter
exchange_delete(exchange=None, if_unused=False, nowait=False)[source]

Delete the exchange.

Parameters:
  • exchange (str or unicode) – The exchange name
  • if_unused (bool) – only delete if the exchange is unused
  • nowait (bool) – Do not wait for an Exchange.DeleteOk
exchange_unbind(destination=None, source=None, routing_key='', nowait=False, arguments=None)[source]

Unbind an exchange from another exchange.

Parameters:
  • destination (str or unicode) – The destination exchange to unbind
  • source (str or unicode) – The source exchange to unbind from
  • routing_key (str or unicode) – The routing key to unbind
  • nowait (bool) – Do not wait for an Exchange.UnbindOk
  • arguments (dict) – Custom key/value pair arguments for the binding
flow(callback, active)

Turn Channel flow control off and on. Pass a callback to be notified of the response from the server. active is a bool. Callback should expect a bool in response indicating channel flow state. For more information, please reference:

http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow

Parameters:
  • callback (method) – The callback method
  • active (bool) – Turn flow on or off
force_data_events(enable)[source]

Turn on and off forcing the blocking adapter to stop and look to see if there are any frames from RabbitMQ in the read buffer. By default the BlockingChannel will check for a read after every RPC command which can cause performance to degrade in scenarios where you do not care if RabbitMQ is trying to send RPC commands to your client connection.

Examples of RPC commands of this sort are:

  • Heartbeats
  • Connection.Close
  • Channel.Close
  • Basic.Return
  • Basic.Ack and Basic.Nack when using delivery confirmations

Turning off forced data events can be a bad thing and prevents your client from properly communicating with RabbitMQ. Forced data events were added in 0.9.6 to enforce proper channel behavior when communicating with RabbitMQ.

Note that the BlockingConnection also has the constant WRITE_TO_READ_RATIO which forces the connection to stop and try and read after writing the number of frames specified in the constant. This is a way to force the client to received these types of frames in a very publish/write IO heavy workload.

Parameters:enable (bool) – Set to False to disable
is_closed

Returns True if the channel is closed.

Return type:bool
is_closing

Returns True if the channel is closing.

Return type:bool
is_open

Returns True if the channel is open.

Return type:bool
open()[source]

Open the channel

queue_bind(queue, exchange, routing_key=None, nowait=False, arguments=None)[source]

Bind the queue to the specified exchange

Parameters:
  • queue (str or unicode) – The queue to bind to the exchange
  • exchange (str or unicode) – The source exchange to bind to
  • routing_key (str or unicode) – The routing key to bind on
  • nowait (bool) – Do not wait for a Queue.BindOk
  • arguments (dict) – Custom key/value pair arguments for the binding
queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, nowait=False, arguments=None)[source]

Declare queue, create if needed. This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Leave the queue name empty for a auto-named queue in RabbitMQ

Parameters:
  • queue (str or unicode) – The queue name
  • passive (bool) – Only check to see if the queue exists
  • durable (bool) – Survive reboots of the broker
  • exclusive (bool) – Only allow access by the current connection
  • auto_delete (bool) – Delete after consumer cancels or disconnects
  • nowait (bool) – Do not wait for a Queue.DeclareOk
  • arguments (dict) – Custom key/value arguments for the queue
queue_delete(queue='', if_unused=False, if_empty=False, nowait=False)[source]

Delete a queue from the broker.

Parameters:
  • queue (str or unicode) – The queue to delete
  • if_unused (bool) – only delete if it’s unused
  • if_empty (bool) – only delete if the queue is empty
  • nowait (bool) – Do not wait for a Queue.DeleteOk
queue_purge(queue='', nowait=False)[source]

Purge all of the messages from the specified queue

Parameters:
  • queue (str or unicode) – The queue to purge
  • nowait (bool) – Do not expect a Queue.PurgeOk response
queue_unbind(queue='', exchange=None, routing_key=None, arguments=None)[source]

Unbind a queue from an exchange.

Parameters:
  • queue (str or unicode) – The queue to unbind from the exchange
  • exchange (str or unicode) – The source exchange to bind from
  • routing_key (str or unicode) – The routing key to unbind
  • arguments (dict) – Custom key/value pair arguments for the binding
start_consuming()[source]

Starts consuming from registered callbacks.

stop_consuming(consumer_tag=None)[source]

Sends off the Basic.Cancel to let RabbitMQ know to stop consuming and sets our internal state to exit out of the basic_consume.

tx_commit()[source]

Commit a transaction.

tx_rollback()[source]

Rollback a transaction.

tx_select()[source]

Select standard transaction mode. This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.