BlockingConnection

The blocking connection adapter module implements blocking semantics on top of Pika’s core AMQP driver. While most of the asynchronous expectations are removed when using the blocking connection adapter, it attempts to remain true to the asynchronous RPC nature of the AMQP protocol, supporting server sent RPC commands.

The user facing classes in the module consist of the BlockingConnection and the BlockingChannel classes.

Be sure to check out examples in Usage Examples.

class pika.adapters.blocking_connection.BlockingConnection(parameters=None, _impl_class=None)[source]

The BlockingConnection creates a layer on top of Pika’s asynchronous core providing methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you can still implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using basic_consume or if you want to be notified of a delivery failure when using basic_publish .

For more information about communicating with the blocking_connection adapter, be sure to check out the BlockingChannel class which implements the Channel based communication for the blocking_connection adapter.

add_on_connection_blocked_callback(callback_method)[source]

Add a callback to be notified when RabbitMQ has sent a Connection.Blocked frame indicating that RabbitMQ is low on resources. Publishers can use this to voluntarily suspend publishing, instead of relying on back pressure throttling. The callback will be passed the Connection.Blocked method frame.

Parameters:callback_method (method) – Callback to call on Connection.Blocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Blocked
add_on_connection_unblocked_callback(callback_method)[source]

Add a callback to be notified when RabbitMQ has sent a Connection.Unblocked frame letting publishers know it’s ok to start publishing again. The callback will be passed the Connection.Unblocked method frame.

Parameters:callback_method (method) – Callback to call on Connection.Unblocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Unblocked
add_timeout(deadline, callback_method)[source]

Create a single-shot timer to fire after deadline seconds. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called.

NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming.

Parameters:
  • deadline (float) – The number of seconds to wait to call callback
  • callback_method (callable) – The callback method with the signature callback_method()
Returns:

opaque timer id

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
basic_nack_supported

Specifies if the server supports basic.nack on the active connection.

Return type:bool
channel(channel_number=None)[source]

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Return type:pika.synchronous_connection.BlockingChannel
close(reply_code=200, reply_text='Normal shutdown')[source]

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
consumer_cancel_notify_supported

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
exchange_exchange_bindings_supported

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

process_data_events(time_limit=0)[source]

Will make sure that data events are processed. Dispatches timer and channel callbacks if not called from the scope of BlockingConnection or BlockingChannel callback. Your app can block on this method.

Parameters:time_limit (float) – suggested upper bound on processing time in seconds. The actual blocking time depends on the granularity of the underlying ioloop. Zero means return as soon as possible. None means there is no limit on processing time and the function will block until I/O produces actionalable events. Defaults to 0 for backward compatibility. This parameter is NEW in pika 0.10.0.
publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
publisher_confirms_supported

Specifies if the active connection can use publisher confirmations.

Return type:bool
remove_timeout(timeout_id)[source]

Remove a timer if it’s still in the timeout stack

Parameters:timeout_id – The opaque timer id to remove
sleep(duration)[source]

A safer way to sleep than calling time.sleep() directly that would keep the adapter from ignoring frames sent from the broker. The connection will “sleep” or block the number of seconds specified in duration in small intervals.

Parameters:duration (float) – The time to sleep in seconds
class pika.adapters.blocking_connection.BlockingChannel(channel_impl, connection)[source]

The BlockingChannel implements blocking semantics for most things that one would use callback-passing-style for with the Channel class. In addition, the BlockingChannel class implements a generator that allows you to consume messages without using callbacks.

Example of creating a BlockingChannel:

import pika

# Create our connection object
connection = pika.BlockingConnection()

# The returned object will be a synchronous channel
channel = connection.channel()
add_on_cancel_callback(callback)[source]

Pass a callback function that will be called when Basic.Cancel is sent by the broker. The callback function should receive a method frame parameter.

Parameters:callback (callable) – a callable for handling broker’s Basic.Cancel notification with the call signature: callback(method_frame) where method_frame is of type pika.frame.Method with method of type spec.Basic.Cancel
add_on_return_callback(callback)[source]

Pass a callback function that will be called when a published message is rejected and returned by the server via Basic.Return.

Parameters:callback (callable) – The method to call on callback with the signature callback(channel, method, properties, body), where channel: pika.Channel method: pika.spec.Basic.Return properties: pika.spec.BasicProperties body: str, unicode, or bytes (python 3.x)
basic_ack(delivery_tag=0, multiple=False)[source]

Acknowledge one or more messages. When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message.

Parameters:
  • delivery-tag (int) – The server-assigned delivery tag
  • multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
basic_cancel(consumer_tag)[source]

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.

NOTE: When cancelling a no_ack=False consumer, this implementation automatically Nacks and suppresses any incoming messages that have not yet been dispatched to the consumer’s callback. However, when cancelling a no_ack=True consumer, this method will return any pending messages that arrived before broker confirmed the cancellation.

Parameters:consumer_tag (str) – Identifier for the consumer; the result of passing a consumer_tag that was created on another channel is undefined (bad things will happen)
Returns:(NEW IN pika 0.10.0) empty sequence for a no_ack=False consumer; for a no_ack=True consumer, returns a (possibly empty) sequence of pending messages that arrived before broker confirmed the cancellation (this is done instead of via consumer’s callback in order to prevent reentrancy/recursion. Each message is four-tuple: (channel, method, properties, body)
channel: BlockingChannel method: spec.Basic.Deliver properties: spec.BasicProperties body: str or unicode
basic_consume(consumer_callback, queue, no_ack=False, exclusive=False, consumer_tag=None, arguments=None)[source]

Sends the AMQP command Basic.Consume to the broker and binds messages for the consumer_tag to the consumer callback. If you do not pass in a consumer_tag, one will be automatically generated for you. Returns the consumer tag.

NOTE: the consumer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming.

For more information about Basic.Consume, see: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume

Parameters:
  • consumer_callback (callable) –

    The function for dispatching messages to user, having the signature: consumer_callback(channel, method, properties, body)

    channel: BlockingChannel method: spec.Basic.Deliver properties: spec.BasicProperties body: str or unicode
  • queue (str or unicode) – The queue to consume from
  • no_ack (bool) – Tell the broker to not expect a response (i.e., no ack/nack)
  • exclusive (bool) – Don’t allow other consumers on the queue
  • consumer_tag (str or unicode) – You may specify your own consumer tag; if left empty, a consumer tag will be generated automatically
  • arguments (dict) – Custom key/value pair arguments for the consumer
Returns:

consumer tag

Return type:

str

Raises pika.exceptions.DuplicateConsumerTag:
 

if consumer with given consumer_tag is already present.

basic_get(queue=None, no_ack=False)[source]

Get a single message from the AMQP broker. Returns a sequence with the method frame, message properties, and body.

Parameters:
  • queue (str or unicode) – Name of queue to get a message from
  • no_ack (bool) – Tell the broker to not expect a reply
Returns:

a three-tuple; (None, None, None) if the queue was empty; otherwise (method, properties, body); NOTE: body may be None

Return type:

(None, None, None)|(spec.Basic.GetOk, spec.BasicProperties, str or unicode or None)

basic_nack(delivery_tag=None, multiple=False, requeue=True)[source]

This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters:
  • delivery-tag (int) – The server-assigned delivery tag
  • multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
  • requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
basic_publish(exchange, routing_key, body, properties=None, mandatory=False, immediate=False)[source]

Publish to the channel with the given exchange, routing key and body. Returns a boolean value indicating the success of the operation.

This is the legacy BlockingChannel method for publishing. See also BasicChannel.publish that provides more information about failures.

For more information on basic_publish and what the parameters do, see:

NOTE: mandatory and immediate may be enabled even without delivery
confirmation, but in the absence of delivery confirmation the synchronous implementation has no way to know how long to wait for the Basic.Return or lack thereof.
Parameters:
  • exchange (str or unicode) – The exchange to publish to
  • routing_key (str or unicode) – The routing key to bind on
  • body (str or unicode) – The message body; empty string if no body
  • properties (pika.spec.BasicProperties) – message properties
  • mandatory (bool) – The mandatory flag
  • immediate (bool) – The immediate flag
Returns:

True if delivery confirmation is not enabled (NEW in pika 0.10.0); otherwise returns False if the message could not be deliveved (Basic.nack and/or Basic.Return) and True if the message was delivered (Basic.ack and no Basic.Return)

basic_qos(prefetch_size=0, prefetch_count=0, all_channels=False)[source]

Specify quality of service. This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement.

Parameters:
  • prefetch_size (int) – This field specifies the prefetch window size. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set in the consumer.
  • prefetch_count (int) – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set in the consumer.
  • all_channels (bool) – Should the QoS apply to all channels
basic_recover(requeue=False)[source]

This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.

Parameters:requeue (bool) – If False, the message will be redelivered to the original recipient. If True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
basic_reject(delivery_tag=None, requeue=True)[source]

Reject an incoming message. This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters:
  • delivery-tag (int) – The server-assigned delivery tag
  • requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
cancel()[source]

Cancel the queue consumer created by BlockingChannel.consume, rejecting all pending ackable messages.

NOTE: If you’re looking to cancel a consumer issued with BlockingChannel.basic_consume then you should call BlockingChannel.basic_cancel.

Return int:The number of messages requeued by Basic.Nack. NEW in 0.10.0: returns 0
channel_number

Channel number

close(reply_code=0, reply_text='Normal Shutdown')[source]

Will invoke a clean shutdown of the channel with the AMQP Broker.

Parameters:
  • reply_code (int) – The reply code to close the channel with
  • reply_text (str) – The reply text to close the channel with
confirm_delivery()[source]

Turn on RabbitMQ-proprietary Confirm mode in the channel.

For more information see:
http://www.rabbitmq.com/extensions.html#confirms
connection

The channel’s BlockingConnection instance

consume(queue, no_ack=False, exclusive=False, arguments=None, inactivity_timeout=None)[source]

Blocking consumption of a queue instead of via a callback. This method is a generator that yields each message as a tuple of method, properties, and body. The active generator iterator terminates when the consumer is cancelled by client or broker.

Example:

for method, properties, body in channel.consume(‘queue’):
print body channel.basic_ack(method.delivery_tag)

You should call BlockingChannel.cancel() when you escape out of the generator loop.

If you don’t cancel this consumer, then next call on the same channel to consume() with the exact same (queue, no_ack, exclusive) parameters will resume the existing consumer generator; however, calling with different parameters will result in an exception.

Parameters:
  • queue (str or unicode) – The queue name to consume
  • no_ack (bool) – Tell the broker to not expect a ack/nack response
  • exclusive (bool) – Don’t allow other consumers on the queue
  • arguments (dict) – Custom key/value pair arguments for the consumer
  • inactivity_timeout (float) – if a number is given (in seconds), will cause the method to yield None after the given period of inactivity; this permits for pseudo-regular maintenance activities to be carried out by the user while waiting for messages to arrive. If None is given (default), then the method blocks until the next event arrives. NOTE that timing granularity is limited by the timer resolution of the underlying implementation. NEW in pika 0.10.0.
Yields:

tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode)

Raises ValueError:
 

if consumer-creation parameters don’t match those of the existing queue consumer generator, if any. NEW in pika 0.10.0

exchange_bind(destination=None, source=None, routing_key='', arguments=None)[source]

Bind an exchange to another exchange.

Parameters:
  • destination (str or unicode) – The destination exchange to bind
  • source (str or unicode) – The source exchange to bind to
  • routing_key (str or unicode) – The routing key to bind on
  • arguments (dict) – Custom key/value pair arguments for the binding
Returns:

Method frame from the Exchange.Bind-ok response

Return type:

pika.frame.Method having method attribute of type spec.Exchange.BindOk

exchange_declare(exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None, **kwargs)[source]

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

Parameters:
  • exchange (str or unicode) – The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
  • exchange_type (str) – The exchange type to use
  • passive (bool) – Perform a declare or just check to see if it exists
  • durable (bool) – Survive a reboot of RabbitMQ
  • auto_delete (bool) – Remove when no more queues are bound to it
  • internal (bool) – Can only be published to by other exchanges
  • arguments (dict) – Custom key/value pair arguments for the exchange
  • type (str) – via kwargs: the deprecated exchange type parameter
Returns:

Method frame from the Exchange.Declare-ok response

Return type:

pika.frame.Method having method attribute of type spec.Exchange.DeclareOk

exchange_delete(exchange=None, if_unused=False)[source]

Delete the exchange.

Parameters:
  • exchange (str or unicode) – The exchange name
  • if_unused (bool) – only delete if the exchange is unused
Returns:

Method frame from the Exchange.Delete-ok response

Return type:

pika.frame.Method having method attribute of type spec.Exchange.DeleteOk

exchange_unbind(destination=None, source=None, routing_key='', arguments=None)[source]

Unbind an exchange from another exchange.

Parameters:
  • destination (str or unicode) – The destination exchange to unbind
  • source (str or unicode) – The source exchange to unbind from
  • routing_key (str or unicode) – The routing key to unbind
  • arguments (dict) – Custom key/value pair arguments for the binding
Returns:

Method frame from the Exchange.Unbind-ok response

Return type:

pika.frame.Method having method attribute of type spec.Exchange.UnbindOk

flow(active)[source]

Turn Channel flow control off and on.

NOTE: RabbitMQ doesn’t support active=False; per https://www.rabbitmq.com/specification.html: “active=false is not supported by the server. Limiting prefetch with basic.qos provides much better control”

For more information, please reference:

http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow

Parameters:active (bool) – Turn flow on (True) or off (False)
Returns:True if broker will start or continue sending; False if not
Return type:bool
get_waiting_message_count()[source]

Returns the number of messages that may be retrieved from the current queue consumer generator via BasicChannel.consume without blocking. NEW in pika 0.10.0

Return type:int
is_closed

Returns True if the channel is closed.

Return type:bool
is_closing

Returns True if the channel is closing.

Return type:bool
is_open

Returns True if the channel is open.

Return type:bool
publish(exchange, routing_key, body, properties=None, mandatory=False, immediate=False)[source]

Publish to the channel with the given exchange, routing key, and body. Unlike the legacy BlockingChannel.basic_publish, this method provides more information about failures via exceptions.

For more information on basic_publish and what the parameters do, see:

NOTE: mandatory and immediate may be enabled even without delivery
confirmation, but in the absence of delivery confirmation the synchronous implementation has no way to know how long to wait for the Basic.Return.
Parameters:
  • exchange (str or unicode) – The exchange to publish to
  • routing_key (str or unicode) – The routing key to bind on
  • body (str or unicode) – The message body; empty string if no body
  • properties (pika.spec.BasicProperties) – message properties
  • mandatory (bool) – The mandatory flag
  • immediate (bool) – The immediate flag
Raises:
  • UnroutableError – raised when a message published in publisher-acknowledgments mode (see BlockingChannel.confirm_delivery) is returned via Basic.Return followed by Basic.Ack.
  • NackError – raised when a message published in publisher-acknowledgements mode is Nack’ed by the broker. See BlockingChannel.confirm_delivery.
queue_bind(queue, exchange, routing_key=None, arguments=None)[source]

Bind the queue to the specified exchange

Parameters:
  • queue (str or unicode) – The queue to bind to the exchange
  • exchange (str or unicode) – The source exchange to bind to
  • routing_key (str or unicode) – The routing key to bind on
  • arguments (dict) – Custom key/value pair arguments for the binding
Returns:

Method frame from the Queue.Bind-ok response

Return type:

pika.frame.Method having method attribute of type spec.Queue.BindOk

queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]

Declare queue, create if needed. This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Leave the queue name empty for a auto-named queue in RabbitMQ

Parameters:
  • queue (str or unicode; if empty string, the broker will create a unique queue name;) – The queue name
  • passive (bool) – Only check to see if the queue exists
  • durable (bool) – Survive reboots of the broker
  • exclusive (bool) – Only allow access by the current connection
  • auto_delete (bool) – Delete after consumer cancels or disconnects
  • arguments (dict) – Custom key/value arguments for the queue
Returns:

Method frame from the Queue.Declare-ok response

Return type:

pika.frame.Method having method attribute of type spec.Queue.DeclareOk

queue_delete(queue='', if_unused=False, if_empty=False)[source]

Delete a queue from the broker.

Parameters:
  • queue (str or unicode) – The queue to delete
  • if_unused (bool) – only delete if it’s unused
  • if_empty (bool) – only delete if the queue is empty
Returns:

Method frame from the Queue.Delete-ok response

Return type:

pika.frame.Method having method attribute of type spec.Queue.DeleteOk

queue_purge(queue='')[source]

Purge all of the messages from the specified queue

Parameters:queue (str or unicode) – The queue to purge
Returns:Method frame from the Queue.Purge-ok response
Return type:pika.frame.Method having method attribute of type spec.Queue.PurgeOk
queue_unbind(queue='', exchange=None, routing_key=None, arguments=None)[source]

Unbind a queue from an exchange.

Parameters:
  • queue (str or unicode) – The queue to unbind from the exchange
  • exchange (str or unicode) – The source exchange to bind from
  • routing_key (str or unicode) – The routing key to unbind
  • arguments (dict) – Custom key/value pair arguments for the binding
Returns:

Method frame from the Queue.Unbind-ok response

Return type:

pika.frame.Method having method attribute of type spec.Queue.UnbindOk

start_consuming()[source]

Processes I/O events and dispatches timers and basic_consume callbacks until all consumers are cancelled.

NOTE: this blocking function may not be called from the scope of a pika callback, because dispatching basic_consume callbacks from this context would constitute recursion.

Raises pika.exceptions.RecursionError:
 if called from the scope of a BlockingConnection or BlockingChannel callback
stop_consuming(consumer_tag=None)[source]

Cancels all consumers, signalling the start_consuming loop to exit.

NOTE: pending non-ackable messages will be lost; pending ackable messages will be rejected.

tx_commit()[source]

Commit a transaction.

Returns:Method frame from the Tx.Commit-ok response
Return type:pika.frame.Method having method attribute of type spec.Tx.CommitOk
tx_rollback()[source]

Rollback a transaction.

Returns:Method frame from the Tx.Commit-ok response
Return type:pika.frame.Method having method attribute of type spec.Tx.CommitOk
tx_select()[source]

Select standard transaction mode. This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

Returns:Method frame from the Tx.Select-ok response
Return type:pika.frame.Method having method attribute of type spec.Tx.SelectOk