Clients#

Classes for RabbitMQ clients.

Warning

Importing this module sets the level of the “pika” logger to WARNING, so that consumers can use the DEBUG and INFO levels without their messages getting lost in Pika’s verbosity.

class yapw.clients.Base(*, url='amqp://127.0.0.1', blocked_connection_timeout=1800, durable=True, exchange='', exchange_type=ExchangeType.direct, prefetch_count=1, decode=<function default_decode>, encode=<function default_encode>, content_type='application/json', routing_key_template='{exchange}_{routing_key}')[source]#

Base class providing common functionality to other clients. You cannot use this class directly.

When consuming a message, by default, its body is decoded using yapw.util.default_decode(). Use the decode keyword argument to change this. The decode must be a function that accepts (state, channel, method, properties, body) arguments (like the consumer callback) and returns a decoded message.

When publishing a message, by default, its body is encoded using yapw.util.default_encode(), and its content type is set to “application/json”. Use the encode and content_type keyword arguments to change this. The encode must be a function that accepts (message, content_type) arguments and returns bytes.

format_routing_key() must be used by methods in subclasses that accept routing keys, in order to namespace the routing keys.

Parameters:
  • url (str) –

  • blocked_connection_timeout (float) –

  • durable (bool) –

  • exchange (str) –

  • exchange_type (ExchangeType) –

  • prefetch_count (int) –

  • decode (Decode) –

  • encode (Encode) –

  • content_type (str) –

  • routing_key_template (str) –

connection: T#

The connection.

channel: Channel | BlockingChannel#

The channel.

__safe__ = frozenset({'connection', 'content_type', 'delivery_mode', 'encode', 'exchange', 'format_routing_key', 'interrupt'})#

Attributes that can - and are expected to - be used safely in consumer callbacks.

__init__(*, url='amqp://127.0.0.1', blocked_connection_timeout=1800, durable=True, exchange='', exchange_type=ExchangeType.direct, prefetch_count=1, decode=<function default_decode>, encode=<function default_encode>, content_type='application/json', routing_key_template='{exchange}_{routing_key}')[source]#
Parameters:
  • url (str) – the connection string (don’t set a blocked_connection_timeout query string parameter)

  • blocked_connection_timeout (float) – the timeout, in seconds, that the connection may remain blocked

  • durable (bool) – whether to declare a durable exchange, declare durable queues, and publish persistent messages

  • exchange (str) – the exchange name

  • exchange_type (ExchangeType) – the exchange type

  • prefetch_count (int) – the maximum number of unacknowledged deliveries that are permitted on the channel

  • decode (yapw.types.Decode) – the message body’s decoder

  • encode (yapw.types.Encode) – the message bodies’ encoder

  • content_type (str) – the messages’ content type

  • routing_key_template (str) – a format string that must contain the {routing_key} replacement field and that may contain other fields matching writable attributes

parameters#

The RabbitMQ connection parameters.

durable#

Whether to declare a durable exchange, declare durable queues, and publish persistent messages.

exchange#

The exchange name.

exchange_type#

The exchange type.

prefetch_count#

The maximum number of unacknowledged messages per consumer.

decode#

The message bodies’ decoder.

encode#

The message bodies’ encoder.

content_type#

The messages’ content type.

routing_key_template#

The format string for the routing key.

delivery_mode#

The messages’ delivery mode.

consumer_tag#

The consumer’s tag.

format_routing_key(routing_key)[source]#

Namespace the routing key.

Parameters:

routing_key (str) – the routing key

Returns:

the formatted routing key

Return type:

str

publish(message, routing_key)[source]#

Publish the message with the routing_key to the configured exchange, from the IO loop thread.

Parameters:
  • message (Any) – a decoded message

  • routing_key (str) – the routing key

Return type:

None

add_signal_handlers(handler)[source]#

Add handlers for the SIGTERM and SIGINT signals, if the current thread is the main thread.

Parameters:

handler (Callable[..., object]) –

Return type:

None

add_signal_handler(signalnum, handler)[source]#

Override this method in subclasses to add a handler for a signal (e.g. using signal.signal() or asyncio.loop.add_signal_handler()). The handler should remove signal handlers (in order to ignore duplicate signals), log a message with a level of INFO, and call yapw.clients.base.interrupt().

Parameters:
  • signalnum (int) –

  • handler (Callable[..., object]) –

Return type:

None

interrupt()[source]#

Override this method in subclasses to shut down gracefully (e.g. wait for threads to terminate).

Return type:

None

property state#

A named tuple of attributes that can be used within threads.

class yapw.clients.Blocking(**kwargs)[source]#

Uses Pika’s BlockingConnection adapter.

Parameters:

kwargs (Any) –

__init__(**kwargs)[source]#

Connect to RabbitMQ, create a channel, set the prefetch count, and declare an exchange, unless using the default exchange.

Parameters:

kwargs (Any) –

connection: T#

The connection.

channel: BlockingChannel#

The channel.

declare_queue(queue, routing_keys=None, arguments=None)[source]#

Declare a queue, and bind it to the exchange with the routing keys. If no routing keys are provided, the queue is bound to the exchange using its name as the routing key.

Parameters:
  • queue (str) – the queue’s name

  • routing_keys (list[str] | None) – the queue’s routing keys

  • arguments (dict[str, str] | None) – any custom key-value arguments

Return type:

None

consume(on_message_callback, queue, routing_keys=None, decorator=<function halt>, arguments=None)[source]#

Declare a queue, bind it to the exchange with the routing keys, and start consuming messages from that queue.

If no routing_keys are provided, the queue is bound to the exchange using its name as the routing key.

Run the consumer callback in separate threads, to not block the IO loop. Add signal handlers to wait for threads to terminate.

The consumer callback is a function that accepts (state, channel, method, properties, body) arguments. The state argument contains thread-safe attributes. The rest of the arguments are the same as pika.channel.Channel.basic_consume()’s on_message_callback.

Parameters:
  • on_message_callback (yapw.types.ConsumerCallback) – the consumer callback

  • queue (str) – the queue’s name

  • routing_keys (list[str] | None) – the queue’s routing keys

  • decorator (yapw.types.Decorator) – the decorator of the consumer callback

  • arguments (dict[str, str] | None) – the arguments parameter to the queue_declare method

Return type:

None

channel_cancel_callback(method)[source]#

Cancel the consumer, which causes the threads to terminate and the connection to close.

RabbitMQ uses basic.cancel if a channel is consuming a queue and the queue is deleted.

Parameters:

method (Cancel) –

Return type:

Any

add_signal_handler(signalnum, handler)[source]#

Add a handler for a signal.

Parameters:
  • signalnum (int) –

  • handler (Callable[..., object]) –

Return type:

None

interrupt()[source]#

Cancel the consumer, which causes the threads to terminate and the connection to close.

Return type:

None

close()[source]#

Close the connection: for example, after sending messages from a simple publisher.

Return type:

None

class yapw.clients.Async(**kwargs)[source]#

Uses Pika’s AsyncioConnection adapter. Reconnects to RabbitMQ if the connection is closed unexpectedly or can’t be established.

Calling start() connects to RabbitMQ, add signal handlers, and starts the IO loop.

The signal handlers cancel the consumer, if consuming and if the channel is open. Otherwise, they wait for threads to terminate and close the connection. This also occurs if the broker cancels the consumer or if the channel closes for any other reason.

Once the IO loop starts, the client creates a channel, sets the prefetch count, and declares the exchange. Once the exchange is declared, the exchange_declareok_callback() calls exchange_ready(). You must define this method in a subclass, to do any work you need.

If the connection becomes blocked or unblocked, the client’s blocked attribute is set to True or False, respectively. Your code can use this attribute to, for example, pause, buffer or reschedule deliveries.

If you subclass this client and add and mutate any attributes, override reset().

See also

Parameters:

kwargs (Any) –

__init__(**kwargs)[source]#

Initialize the client’s state.

Parameters:

kwargs (Any) –

executor#

The thread pool executor.

blocked#

Whether the connection is blocked.

stopping#

Whether the client is being stopped deliberately.

property thread_name_infix: str#

Return the exchange name to use as part of the thread name.

start()[source]#

Connect to RabbitMQ, add signal handlers, and start the IO loop.

Return type:

None

connect()[source]#

Connect to RabbitMQ, create a channel, set the prefetch count, and declare an exchange, unless using the default exchange.

Return type:

None

connection_blocked_callback(connection, method)[source]#

Mark the client as blocked.

Subclasses must implement any logic for pausing deliveries or filling buffers.

Parameters:
Return type:

None

connection_unblocked_callback(connection, method)[source]#

Mark the client as unblocked.

Subclasses must implement any logic for resuming deliveries or clearing buffers.

Parameters:
Return type:

None

reconnect()[source]#

Reconnect to RabbitMQ, unless a signal was received while the timer was running. If so, stop the IO loop.

Return type:

None

reset()[source]#

Reset the client’s state, before reconnecting.

Override this method in subclasses, if your subclass adds and mutates any attributes.

Return type:

None

connection_open_error_callback(connection, error)[source]#

Retry, once the connection couldn’t be established.

Parameters:
Return type:

None

connection_close_callback(connection, reason)[source]#

Reconnect, if the connection was closed unexpectedly. Otherwise, stop the IO loop.

Parameters:
Return type:

None

add_signal_handler(signalnum, handler)[source]#

Add a handler for a signal.

Parameters:
  • signalnum (int) –

  • handler (Callable[..., object]) –

Return type:

None

interrupt()[source]#

Cancel the consumer if consuming and if the channel is open. Otherwise, wait for threads to terminate and close the connection.

Return type:

None

connection_open_callback(connection)[source]#

Open a channel, once the connection is open.

Parameters:

connection (Connection) –

Return type:

None

channel_open_callback(channel)[source]#

Set the prefetch count, once the channel is open.

Parameters:

channel (Channel) –

Return type:

None

channel_cancelok_callback(method)[source]#

Close the channel, once the consumer is cancelled. The channel_close_callback() closes the connection.

Parameters:

method (pika.frame.Method[pika.spec.Basic.CancelOk]) –

Return type:

Any

channel_close_callback(channel, reason)[source]#

Close the connection, once the client cancelled the consumer or once RabbitMQ closed the channel due to, e.g., redeclaring exchanges with inconsistent parameters.

A warning is logged, in case it was the latter.

Parameters:
Return type:

None

channel_qosok_callback(method)[source]#

Declare the exchange, once the prefetch count is set.

Parameters:

method (pika.frame.Method[pika.spec.Basic.QosOk]) –

Return type:

None

exchange_declareok_callback(method)[source]#

Perform user-specified actions, once the exchange is declared.

Parameters:

method (pika.frame.Method[pika.spec.Exchange.DeclareOk]) –

Return type:

None

exchange_ready()[source]#

Override this method in subclasses, which is called once the exchange is declared.

Return type:

None

class yapw.clients.AsyncConsumer(*, on_message_callback, queue, routing_keys=None, decorator=<function halt>, arguments=None, **kwargs)[source]#

An asynchronous consumer, extending Async.

After calling start(), this client declares the queue, binds it to the exchange with the routing_keys, and starts consuming messages from that queue, using the on_message_callback.

The on_message_callback and queue keyword arguments are required. If no routing_keys are provided, the queue is bound to the exchange using its name as the routing key.

The pika.channel.Channel.basic_consume() call sets its callback to an empty method, channel_consumeok_callback(). Define this method in a subclass, if needed.

Parameters:
  • on_message_callback (ConsumerCallback) –

  • queue (str) –

  • routing_keys (list[str] | None) –

  • decorator (Decorator) –

  • arguments (dict[str, Any] | None) –

  • kwargs (Any) –

__init__(*, on_message_callback, queue, routing_keys=None, decorator=<function halt>, arguments=None, **kwargs)[source]#
Parameters:
  • on_message_callback (yapw.types.ConsumerCallback) – the consumer callback

  • queue (str) – the queue’s name

  • routing_keys (list[str] | None) – the queue’s routing keys

  • decorator (yapw.types.Decorator) – the decorator of the consumer callback

  • arguments (dict[str, Any] | None) – the arguments parameter to the queue_declare method

  • kwargs (Any) –

Return type:

None

queue#

The queue’s name.

routing_keys#

The queue’s routing keys.

arguments#

The arguments parameter to the queue_declare method.

on_message_callback#

The consumer callback.

decorator#

The decorator of the consumer callback.

property thread_name_infix: str#

Return the queue name to use as part of the thread name.

exchange_ready()[source]#

Declare the queue, once the exchange is declared.

Return type:

None

queue_declareok_callback(method, queue_name)[source]#

Bind the queue to the first routing key, once the queue is declared.

Parameters:
Return type:

None

queue_bindok_callback(method, queue_name, index)[source]#

Bind the queue to the remaining routing keys, or start consuming if all routing keys bound.

Parameters:
Return type:

None

consume(on_message_callback, decorator, queue_name)[source]#

Start consuming messages from the queue.

Run the consumer callback in separate threads, to not block the IO loop. (This assumes the consumer callback is CPU-bound.) Add signal handlers to wait for threads to terminate.

The consumer callback is a function that accepts (state, channel, method, properties, body) arguments. The state argument contains thread-safe attributes. The rest of the arguments are the same as pika.channel.Channel.basic_consume()’s on_message_callback.

Parameters:
  • on_message_callback (yapw.types.ConsumerCallback) –

  • decorator (yapw.types.Decorator) –

  • queue_name (str) –

Return type:

None

channel_cancel_callback(method)[source]#

Close the channel.

RabbitMQ uses basic.cancel if a channel is consuming a queue and the queue is deleted.

Parameters:

method (Any) –

Return type:

Any

channel_consumeok_callback(method)[source]#

Override this method in subclasses to perform any other work, once the consumer is started.

Parameters:

method (pika.frame.Method[pika.spec.Basic.ConsumeOk]) –

Return type:

None