connection

Note

The following class level documentation is not intended for use by those using Pika in their applications. This documentation is for those who are extending Pika or otherwise working on the driver itself. For an overview of how to use adapters, please reference the Connecting to RabbitMQ documentation.

Core connection objects

Parameters

class pika.connection.Parameters[source]

Base connection parameters class definition

Parameters:
  • DEFAULT_HOST (str) – ‘localhost’
  • DEFAULT_PORT (int) – 5672
  • DEFAULT_VIRTUAL_HOST (str) – ‘/’
  • DEFAULT_USERNAME (str) – ‘guest’
  • DEFAULT_PASSWORD (str) – ‘guest’
  • DEFAULT_HEARTBEAT_INTERVAL (int) – 0
  • DEFAULT_CHANNEL_MAX (int) – 0
  • DEFAULT_FRAME_MAX (int) – pika.spec.FRAME_MAX_SIZE
  • DEFAULT_LOCALE (str) – ‘en_US’
  • DEFAULT_CONNECTION_ATTEMPTS (int) – 1
  • DEFAULT_RETRY_DELAY (int|float) – 2.0
  • DEFAULT_SOCKET_TIMEOUT (int|float) – 0.25
  • DEFAULT_SSL (bool) – False
  • DEFAULT_SSL_OPTIONS (dict) – {}
  • DEFAULT_SSL_PORT (int) – 5671
  • DEFAULT_BACKPRESSURE_DETECTION (bool) – False
DEFAULT_BACKPRESSURE_DETECTION = False
DEFAULT_CONNECTION_ATTEMPTS = 1
DEFAULT_CHANNEL_MAX = 0
DEFAULT_FRAME_MAX = 131072
DEFAULT_HEARTBEAT_INTERVAL = 0
DEFAULT_HOST = 'localhost'
DEFAULT_LOCALE = 'en_US'
DEFAULT_PASSWORD = 'guest'
DEFAULT_PORT = 5672
DEFAULT_RETRY_DELAY = 2.0
DEFAULT_SOCKET_TIMEOUT = 0.25
DEFAULT_SSL = False
DEFAULT_SSL_OPTIONS = {}
DEFAULT_SSL_PORT = 5671
DEFAULT_USERNAME = 'guest'
DEFAULT_VIRTUAL_HOST = '/'
_credentials(username, password)[source]

Return a plain credentials object for the specified username and password.

Parameters:
  • username (str) – The username to use
  • password (str) – The password to use
Return type:

pika_credentials.PlainCredentials

_validate_backpressure(backpressure_detection)[source]

Validate that the backpressure detection option is a bool.

Parameters:backpressure_detection (bool) – The backpressure detection value
Return type:bool
Raises:TypeError
_validate_channel_max(channel_max)[source]

Validate that the channel_max value is an int

Parameters:channel_max (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_connection_attempts(connection_attempts)[source]

Validate that the channel_max value is an int

Parameters:connection_attempts (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_credentials(credentials)[source]

Validate the credentials passed in are using a valid object type.

Parameters:credentials (pika.credentials.Credentials) – Credentials to validate
Return type:bool
Raises:TypeError
_validate_frame_max(frame_max)[source]
Validate that the frame_max value is an int and does not exceed
the maximum frame size and is not less than the frame min size.
Parameters:frame_max (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:InvalidMinimumFrameSize
_validate_heartbeat_interval(heartbeat_interval)[source]

Validate that the heartbeat_interval value is an int

Parameters:heartbeat_interval (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_host(host)[source]

Validate that the host value is an str

Parameters:host (str) – The value to validate
Return type:bool
Raises:TypeError
_validate_locale(locale)[source]

Validate that the locale value is an str

Parameters:locale (str) – The value to validate
Return type:bool
Raises:TypeError
_validate_port(port)[source]

Validate that the port value is an int

Parameters:port (int) – The value to validate
Return type:bool
Raises:TypeError
_validate_retry_delay(retry_delay)[source]

Validate that the retry_delay value is an int or float

Parameters:retry_delay (int|float) – The value to validate
Return type:bool
Raises:TypeError
_validate_socket_timeout(socket_timeout)[source]

Validate that the socket_timeout value is an int or float

Parameters:socket_timeout (int|float) – The value to validate
Return type:bool
Raises:TypeError
_validate_ssl(ssl)[source]

Validate the SSL toggle is a bool

Parameters:ssl (bool) – The SSL enabled/disabled value
Return type:bool
Raises:TypeError
_validate_ssl_options(ssl_options)[source]

Validate the SSL options value is a dictionary.

Parameters:ssl_options (dict|None) – SSL Options to validate
Return type:bool
Raises:TypeError
_validate_virtual_host(virtual_host)[source]

Validate that the virtual_host value is an str

Parameters:virtual_host (str) – The value to validate
Return type:bool
Raises:TypeError

ConnectionParameters

class pika.connection.ConnectionParameters(host=None, port=None, virtual_host=None, credentials=None, channel_max=None, frame_max=None, heartbeat_interval=None, ssl=None, ssl_options=None, connection_attempts=None, retry_delay=None, socket_timeout=None, locale=None, backpressure_detection=None)[source]

Connection parameters object that is passed into the connection adapter upon construction.

DEFAULT_BACKPRESSURE_DETECTION = False
DEFAULT_CHANNEL_MAX = 0
DEFAULT_CONNECTION_ATTEMPTS = 1
DEFAULT_FRAME_MAX = 131072
DEFAULT_HEARTBEAT_INTERVAL = 0
DEFAULT_HOST = 'localhost'
DEFAULT_LOCALE = 'en_US'
DEFAULT_PASSWORD = 'guest'
DEFAULT_PORT = 5672
DEFAULT_RETRY_DELAY = 2.0
DEFAULT_SOCKET_TIMEOUT = 0.25
DEFAULT_SSL = False
DEFAULT_SSL_OPTIONS = {}
DEFAULT_SSL_PORT = 5671
DEFAULT_USERNAME = 'guest'
DEFAULT_VIRTUAL_HOST = '/'
_credentials(username, password)

Return a plain credentials object for the specified username and password.

Parameters:
  • username (str) – The username to use
  • password (str) – The password to use
Return type:

pika_credentials.PlainCredentials

_validate_backpressure(backpressure_detection)

Validate that the backpressure detection option is a bool.

Parameters:backpressure_detection (bool) – The backpressure detection value
Return type:bool
Raises:TypeError
_validate_channel_max(channel_max)

Validate that the channel_max value is an int

Parameters:channel_max (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_connection_attempts(connection_attempts)

Validate that the channel_max value is an int

Parameters:connection_attempts (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_credentials(credentials)

Validate the credentials passed in are using a valid object type.

Parameters:credentials (pika.credentials.Credentials) – Credentials to validate
Return type:bool
Raises:TypeError
_validate_frame_max(frame_max)
Validate that the frame_max value is an int and does not exceed
the maximum frame size and is not less than the frame min size.
Parameters:frame_max (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:InvalidMinimumFrameSize
_validate_heartbeat_interval(heartbeat_interval)

Validate that the heartbeat_interval value is an int

Parameters:heartbeat_interval (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_host(host)

Validate that the host value is an str

Parameters:host (str) – The value to validate
Return type:bool
Raises:TypeError
_validate_locale(locale)

Validate that the locale value is an str

Parameters:locale (str) – The value to validate
Return type:bool
Raises:TypeError
_validate_port(port)

Validate that the port value is an int

Parameters:port (int) – The value to validate
Return type:bool
Raises:TypeError
_validate_retry_delay(retry_delay)

Validate that the retry_delay value is an int or float

Parameters:retry_delay (int|float) – The value to validate
Return type:bool
Raises:TypeError
_validate_socket_timeout(socket_timeout)

Validate that the socket_timeout value is an int or float

Parameters:socket_timeout (int|float) – The value to validate
Return type:bool
Raises:TypeError
_validate_ssl(ssl)

Validate the SSL toggle is a bool

Parameters:ssl (bool) – The SSL enabled/disabled value
Return type:bool
Raises:TypeError
_validate_ssl_options(ssl_options)

Validate the SSL options value is a dictionary.

Parameters:ssl_options (dict|None) – SSL Options to validate
Return type:bool
Raises:TypeError
_validate_virtual_host(virtual_host)

Validate that the virtual_host value is an str

Parameters:virtual_host (str) – The value to validate
Return type:bool
Raises:TypeError

URLParameters

class pika.connection.URLParameters(url)[source]

Create a Connection parameters object based off of URIParameters

_process_url(url)[source]

Take an AMQP URL and break it up into the various parameters.

Parameters:url (str) – The URL to parse
DEFAULT_BACKPRESSURE_DETECTION = False
DEFAULT_CHANNEL_MAX = 0
DEFAULT_CONNECTION_ATTEMPTS = 1
DEFAULT_FRAME_MAX = 131072
DEFAULT_HEARTBEAT_INTERVAL = 0
DEFAULT_HOST = 'localhost'
DEFAULT_LOCALE = 'en_US'
DEFAULT_PASSWORD = 'guest'
DEFAULT_PORT = 5672
DEFAULT_RETRY_DELAY = 2.0
DEFAULT_SOCKET_TIMEOUT = 0.25
DEFAULT_SSL = False
DEFAULT_SSL_OPTIONS = {}
DEFAULT_SSL_PORT = 5671
DEFAULT_USERNAME = 'guest'
DEFAULT_VIRTUAL_HOST = '/'
_credentials(username, password)

Return a plain credentials object for the specified username and password.

Parameters:
  • username (str) – The username to use
  • password (str) – The password to use
Return type:

pika_credentials.PlainCredentials

_validate_backpressure(backpressure_detection)

Validate that the backpressure detection option is a bool.

Parameters:backpressure_detection (bool) – The backpressure detection value
Return type:bool
Raises:TypeError
_validate_channel_max(channel_max)

Validate that the channel_max value is an int

Parameters:channel_max (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_connection_attempts(connection_attempts)

Validate that the channel_max value is an int

Parameters:connection_attempts (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_credentials(credentials)

Validate the credentials passed in are using a valid object type.

Parameters:credentials (pika.credentials.Credentials) – Credentials to validate
Return type:bool
Raises:TypeError
_validate_frame_max(frame_max)
Validate that the frame_max value is an int and does not exceed
the maximum frame size and is not less than the frame min size.
Parameters:frame_max (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:InvalidMinimumFrameSize
_validate_heartbeat_interval(heartbeat_interval)

Validate that the heartbeat_interval value is an int

Parameters:heartbeat_interval (int) – The value to validate
Return type:bool
Raises:TypeError
Raises:ValueError
_validate_host(host)

Validate that the host value is an str

Parameters:host (str) – The value to validate
Return type:bool
Raises:TypeError
_validate_locale(locale)

Validate that the locale value is an str

Parameters:locale (str) – The value to validate
Return type:bool
Raises:TypeError
_validate_port(port)

Validate that the port value is an int

Parameters:port (int) – The value to validate
Return type:bool
Raises:TypeError
_validate_retry_delay(retry_delay)

Validate that the retry_delay value is an int or float

Parameters:retry_delay (int|float) – The value to validate
Return type:bool
Raises:TypeError
_validate_socket_timeout(socket_timeout)

Validate that the socket_timeout value is an int or float

Parameters:socket_timeout (int|float) – The value to validate
Return type:bool
Raises:TypeError
_validate_ssl(ssl)

Validate the SSL toggle is a bool

Parameters:ssl (bool) – The SSL enabled/disabled value
Return type:bool
Raises:TypeError
_validate_ssl_options(ssl_options)

Validate the SSL options value is a dictionary.

Parameters:ssl_options (dict|None) – SSL Options to validate
Return type:bool
Raises:TypeError
_validate_virtual_host(virtual_host)

Validate that the virtual_host value is an str

Parameters:virtual_host (str) – The value to validate
Return type:bool
Raises:TypeError

Connection

class pika.connection.Connection(parameters=None, on_open_callback=None)[source]

This is the core class that implements communication with RabbitMQ. This class should not be invoked directly but rather through the use of an adapter such as SelectConnection or BlockingConnection.

CONNECTION_CLOSED = 0
CONNECTION_INIT = 1
CONNECTION_PROTOCOL = 2
CONNECTION_START = 3
CONNECTION_TUNE = 4
CONNECTION_OPEN = 5
CONNECTION_CLOSING = 6
add_backpressure_callback(callback_method)[source]

Call method “callback” when pika believes backpressure is being applied.

Parameters:callback_method (method) – The method to call
add_on_close_callback(callback_method)[source]

Add a callback notification when the connection has closed.

Parameters:callback_method (method) – The callback when the channel is opened
add_on_open_callback(callback_method)[source]

Add a callback notification when the connection has opened.

Parameters:callback_method (method) – The callback when the channel is opened
add_timeout(deadline, callback_method)[source]

Adapters should override to call the callback after the specified number of seconds have elapsed, using a timer, or a thread, or similar.

Parameters:
  • deadline (int) – The number of seconds to wait to call callback
  • callback_method (method) – The callback method
channel(on_open_callback, channel_number=None)[source]

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Parameters:
  • on_open_callback (method) – The callback when the channel is opened
  • channel_number (int) – The channel number to use, defaults to the next available.
close(reply_code=200, reply_text='Normal shutdown')[source]

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

Parameters:
  • reply_code (int) – The code number for the close
  • reply_text (str) – The text reason for the close
remove_timeout(callback_method)[source]

Adapters should override to call the callback after the specified number of seconds have elapsed, using a timer, or a thread, or similar.

Parameters:callback_method (method) – The callback to remove a timeout for
set_backpressure_multiplier(value=10)[source]

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Parameters:value (int) – The multiplier value to set
is_closed

Returns a boolean reporting the current connection state.

is_closing

Returns a boolean reporting the current connection state.

is_open

Returns a boolean reporting the current connection state.

basic_nack

Specifies if the server supports basic.nack on the active connection.

Return type:bool
consumer_cancel_notify

Specifies if the server supports consumer cancel notification on the active connection.

Return type:bool
exchange_exchange_bindings

Specifies if the active connection supports exchange to exchange bindings.

Return type:bool
publisher_confirms

Specifies if the active connection can use publisher confirmations.

Return type:bool
_adapter_connect()[source]

Subclasses should override to set up the outbound socket connection.

Raises:NotImplementedError
_adapter_disconnect()[source]

Subclasses should override this to cause the underlying transport (socket) to close.

Raises:NotImplementedError
_add_channel_callbacks(channel_number)[source]

Add the appropriate callbacks for the specified channel number.

Parameters:channel_number (int) – The channel number for the callbacks
_add_connection_start_callback()[source]

Add a callback for when a Connection.Start frame is received from the broker.

_add_connection_tune_callback()[source]

Add a callback for when a Connection.Tune frame is received.

_append_frame_buffer(bytes)[source]

Append the bytes to the frame buffer.

Parameters:bytes (str) – The bytes to append to the frame buffer
_buffer_size

Return the suggested buffer size from the connection state/tune or the default if that is None.

Return type:int
_check_for_protocol_mismatch(value)[source]

Invoked when starting a connection to make sure it’s a supported protocol.

Parameters:value (pika.frame.Method) – The frame to check
Raises:ProtocolVersionMismatch
_client_properties

Return the client properties dictionary.

Return type:dict
_close_channels(reply_code, reply_text)[source]

Close the open channels with the specified reply_code and reply_text.

Parameters:
  • reply_code (int) – The code for why the channels are being closed
  • reply_text (str) – The text reason for why the channels are closing
_combine(a, b)[source]

Pass in two values, if a is 0, return b otherwise if b is 0, return a. If neither case matches return the smallest value.

Parameters:
  • a (int) – The first value
  • b (int) – The second value
Return type:

int

_connect()[source]

Call the Adapter’s connect method after letting the ReconnectionStrategy know.

_create_channel(channel_number, on_open_callback)[source]

Create a new channel using the specified channel number and calling back the method specified by on_open_callback

Parameters:
  • channel_number (int) – The channel number to use
  • on_open_callback (method) – The callback when the channel is opened
_create_heartbeat_checker()[source]

Create a heartbeat checker instance if there is a heartbeat interval set.

Return type:pika.heartbeat.Heartbeat
_deliver_frame_to_channel(value)[source]

Deliver the frame to the channel specified in the frame.

Parameters:value (pika.frame.Method) – The frame to deliver
_detect_backpressure()[source]

Attempt to calculate if TCP backpressure is being applied due to our outbound buffer being larger than the average frame size over a window of frames.

_ensure_closed()[source]

If the connection is not closed, close it.

_flush_outbound()[source]

Adapters should override to flush the contents of outbound_buffer out along the socket.

Raises:NotImplementedError
_get_body_frame_max_length()[source]

Calculate the maximum amount of bytes that can be in a body frame.

Return type:int
_get_credentials(method_frame)[source]

Get credentials for authentication.

Parameters:method_frame (pika.frame.MethodFrame) – The Connection.Start frame
Return type:tuple(str, str)
_has_open_channels

Returns true if channels are open.

Return type:bool
_has_pending_callbacks(value)[source]

Return true if there are any callbacks pending for the specified frame.

Parameters:value (pika.frame.Method) – The frame to check
Return type:bool
_init_connection_state()[source]

Initialize or reset all of the internal state variables for a given connection. On disconnect or reconnect all of the state needs to be wiped.

_is_basic_deliver_frame(frame_value)[source]

Returns true if the frame is a Basic.Deliver

Parameters:frame_value (pika.frame.Method) – The frame to check
Return type:bool
_is_connection_close_frame(value)[source]

Returns true if the frame is a Connection.Close frame.

Parameters:value (pika.frame.Method) – The frame to check
Return type:bool
_is_method_frame(value)[source]

Returns true if the frame is a method frame.

Parameters:value (pika.frame.Frame) – The frame to evaluate
Return type:bool
_is_protocol_header_frame(value)[source]

Returns True if it’s a protocol header frame.

Return type:bool
_next_channel_number()[source]

Return the next available channel number or raise on exception.

Return type:int
_on_channel_closeok(method_frame)[source]

Remove the channel from the dict of channels when Channel.CloseOk is sent.

Parameters:method_frame (spec.Channel.CloseOk) – The response
_on_close_ready()[source]

Called when the Connection is in a state that it can close after a close has been requested. This happens, for example, when all of the channels are closed that were open when the close request was made.

_on_connected()[source]

This is called by our connection Adapter to let us know that we’ve connected and we can notify our connection strategy.

_on_connection_closed(method_frame, from_adapter=False)[source]

Called when the connection is closed remotely. The from_adapter value will be true if the connection adapter has been disconnected from the broker and the method was invoked directly instead of by receiving a Connection.Close frame.

Parameters:
  • pika.frame.Method – The Connection.Close frame
  • from_adapter (bool) – Called by the connection adapter
_on_connection_open(method_frame)[source]

This is called once we have tuned the connection with the server and called the Connection.Open on the server and it has replied with Connection.Ok.

_on_connection_start(method_frame)[source]

This is called as a callback once we have received a Connection.Start from the server.

Parameters:method_frame (pika.frame.Method) – The frame received
Raises:UnexpectedFrameError
_on_connection_tune(method_frame)[source]

Once the Broker sends back a Connection.Tune, we will set our tuning variables that have been returned to us and kick off the Heartbeat monitor if required, send our TuneOk and then the Connection. Open rpc call on channel 0.

Parameters:method_frame (pika.frame.Method) – The frame received
_on_data_available(data_in)[source]

This is called by our Adapter, passing in the data from the socket. As long as we have buffer try and map out frame data.

Parameters:data_in (str) – The data that is available to read
_process_callbacks(frame_value)[source]

Process the callbacks for the frame if the frame is a method frame and if it has any callbacks pending.

Parameters:frame_value (pika.frame.Method) – The frame to process
Return type:bool
_process_connection_closed_callbacks()[source]

Process any callbacks that should be called when the connection is closed.

_process_frame(frame_value)[source]

Process an inbound frame from the socket.

Parameters:frame_value (pika.frame.Frame | pika.frame.Method) – The frame to process
_read_frame()[source]

Try and read from the frame buffer and decode a frame.

Rtype tuple:(int, pika.frame.Frame)
_reject_out_of_band_delivery(channel_number, delivery_tag)[source]

Reject a delivery on the specified channel number and delivery tag because said channel no longer exists.

Parameters:
  • channel_number (int) – The channel number
  • delivery_tag (int) – The delivery tag
_remove_callback(channel_number, method_frame)[source]

Remove the specified method_frame callback if it is set for the specified channel number.

Parameters:
  • channel_number (int) – The channel number to remove the callback on
  • pika.object.Method – The method frame for the callback
_remove_callbacks(channel_number, method_frames)[source]

Remove the callbacks for the specified channel number and list of method frames.

Parameters:
  • channel_number (int) – The channel number to remove the callback on
  • method_frames (list) – The method frames for the callback
_remove_connection_callbacks()[source]

Remove all callbacks for the connection

_rpc(channel_number, method_frame, callback_method=None, acceptable_replies=None)[source]

Make an RPC call for the given callback, channel number and method. acceptable_replies lists out what responses we’ll process from the server with the specified callback.

Parameters:
  • channel_number (int) – The channel number for the RPC call
  • method_frame (pika.object.Method) – The method frame to call
  • callback_method (method) – The callback for the RPC response
  • acceptable_replies (list) – The replies this RPC call expects
_send_connection_close(reply_code, reply_text)[source]

Send a Connection.Close method frame.

Parameters:
  • reply_code (int) – The reason for the close
  • reply_text (str) – The text reason for the close
_send_connection_open()[source]

Send a Connection.Open frame

_send_connection_start_ok(authentication_type, response)[source]

Send a Connection.StartOk frame

Parameters:
  • authentication_type (str) – The auth type value
  • response (str) – The encoded value to send
_send_connection_tune_ok()[source]

Send a Connection.TuneOk frame

_send_frame(frame_value)[source]

This appends the fully generated frame to send to the broker to the output buffer which will be then sent via the connection adapter.

Parameters:frame_value (pika.frame.Frame|pika.frame.ProtocolHeader) – The frame to write
_send_method(channel_number, method_frame, content=None)[source]

Constructs a RPC method frame and then sends it to the broker.

Parameters:
  • channel_number (int) – The channel number for the frame
  • method_frame (pika.object.Method) – The method frame to send
  • content (tuple) – If set, is a content frame, is tuple of properties and body.
_set_connection_state(connection_state)[source]

Set the connection state.

Parameters:connection_state (int) – The connection state to set
_set_server_information(method_frame)[source]

Set the server properties and capabilities

Parameters:method_frame (spec.connection.Start) – The Connection.Start frame
_trim_frame_buffer(byte_count)[source]

Trim the leading N bytes off the frame buffer and increment the counter that keeps track of how many bytes have been read/used from the socket.

Parameters:byte_count (int) – The number of bytes consumed