Clients#
Classes for RabbitMQ clients.
Warning
Importing this module sets the level of the “pika” logger to WARNING
, so that consumers can use the DEBUG
and INFO
levels without their messages getting lost in Pika’s verbosity.
- class yapw.clients.Base(*, url='amqp://127.0.0.1', blocked_connection_timeout=1800, durable=True, exchange='', exchange_type=ExchangeType.direct, prefetch_count=1, decode=<function default_decode>, encode=<function default_encode>, content_type='application/json', routing_key_template='{exchange}_{routing_key}')[source]#
Base class providing common functionality to other clients. You cannot use this class directly.
When consuming a message, by default, its body is decoded using
yapw.util.default_decode()
. Use thedecode
keyword argument to change this. Thedecode
must be a function that accepts(state, channel, method, properties, body)
arguments (like the consumer callback) and returns a decoded message.When publishing a message, by default, its body is encoded using
yapw.util.default_encode()
, and its content type is set to “application/json”. Use theencode
andcontent_type
keyword arguments to change this. Theencode
must be a function that accepts(message, content_type)
arguments and returns bytes.format_routing_key()
must be used by methods in subclasses that accept routing keys, in order to namespace the routing keys.- Parameters:
- connection: T#
The connection.
- channel: Channel | BlockingChannel#
The channel.
- __safe__ = frozenset({'connection', 'content_type', 'delivery_mode', 'encode', 'exchange', 'format_routing_key', 'interrupt'})#
Attributes that can - and are expected to - be used safely in consumer callbacks.
- __init__(*, url='amqp://127.0.0.1', blocked_connection_timeout=1800, durable=True, exchange='', exchange_type=ExchangeType.direct, prefetch_count=1, decode=<function default_decode>, encode=<function default_encode>, content_type='application/json', routing_key_template='{exchange}_{routing_key}')[source]#
- Parameters:
url (str) – the connection string (don’t set a
blocked_connection_timeout
query string parameter)blocked_connection_timeout (float) – the timeout, in seconds, that the connection may remain blocked
durable (bool) – whether to declare a durable exchange, declare durable queues, and publish persistent messages
exchange (str) – the exchange name
exchange_type (ExchangeType) – the exchange type
prefetch_count (int) – the maximum number of unacknowledged deliveries that are permitted on the channel
decode (yapw.types.Decode) – the message body’s decoder
encode (yapw.types.Encode) – the message bodies’ encoder
content_type (str) – the messages’ content type
routing_key_template (str) – a format string that must contain the
{routing_key}
replacement field and that may contain other fields matching writable attributes
- parameters#
The RabbitMQ connection parameters.
- durable#
Whether to declare a durable exchange, declare durable queues, and publish persistent messages.
- exchange#
The exchange name.
- exchange_type#
The exchange type.
- prefetch_count#
The maximum number of unacknowledged messages per consumer.
- decode#
The message bodies’ decoder.
- encode#
The message bodies’ encoder.
- content_type#
The messages’ content type.
- routing_key_template#
The format string for the routing key.
- delivery_mode#
The messages’ delivery mode.
- consumer_tag#
The consumer’s tag.
- publish(message, routing_key)[source]#
Publish the
message
with therouting_key
to the configured exchange, from the IO loop thread.
- add_signal_handlers(handler)[source]#
Add handlers for the SIGTERM and SIGINT signals, if the current thread is the main thread.
- Parameters:
handler (Callable[..., object]) –
- Return type:
None
- add_signal_handler(signalnum, handler)[source]#
Override this method in subclasses to add a handler for a signal (e.g. using
signal.signal()
orasyncio.loop.add_signal_handler()
). The handler should remove signal handlers (in order to ignore duplicate signals), log a message with a level ofINFO
, and callyapw.clients.base.interrupt()
.
- interrupt()[source]#
Override this method in subclasses to shut down gracefully (e.g. wait for threads to terminate).
- Return type:
None
- property state#
A named tuple of attributes that can be used within threads.
- class yapw.clients.Blocking(**kwargs)[source]#
Uses Pika’s
BlockingConnection adapter
.- Parameters:
kwargs (Any) –
- __init__(**kwargs)[source]#
Connect to RabbitMQ, create a channel, set the prefetch count, and declare an exchange, unless using the default exchange.
- Parameters:
kwargs (Any) –
- connection: T#
The connection.
- channel: BlockingChannel#
The channel.
- declare_queue(queue, routing_keys=None, arguments=None)[source]#
Declare a queue, and bind it to the exchange with the routing keys. If no routing keys are provided, the queue is bound to the exchange using its name as the routing key.
- consume(on_message_callback, queue, routing_keys=None, decorator=<function halt>, arguments=None)[source]#
Declare a queue, bind it to the exchange with the routing keys, and start consuming messages from that queue.
If no
routing_keys
are provided, thequeue
is bound to the exchange using its name as the routing key.Run the consumer callback in separate threads, to not block the IO loop. Add signal handlers to wait for threads to terminate.
The consumer callback is a function that accepts
(state, channel, method, properties, body)
arguments. Thestate
argument contains thread-safe attributes. The rest of the arguments are the same aspika.channel.Channel.basic_consume()
’son_message_callback
.
- channel_cancel_callback(method)[source]#
Cancel the consumer, which causes the threads to terminate and the connection to close.
RabbitMQ uses basic.cancel if a channel is consuming a queue and the queue is deleted.
- class yapw.clients.Async(**kwargs)[source]#
Uses Pika’s
AsyncioConnection adapter
. Reconnects to RabbitMQ if the connection is closed unexpectedly or can’t be established.Calling
start()
connects to RabbitMQ, add signal handlers, and starts the IO loop.The signal handlers cancel the consumer, if consuming and if the channel is open. Otherwise, they wait for threads to terminate and close the connection. This also occurs if the broker cancels the consumer or if the channel closes for any other reason.
Once the IO loop starts, the client creates a channel, sets the prefetch count, and declares the exchange. Once the exchange is declared, the
exchange_declareok_callback()
callsexchange_ready()
. You must define this method in a subclass, to do any work you need.If the connection becomes blocked or unblocked, the client’s
blocked
attribute is set toTrue
orFalse
, respectively. Your code can use this attribute to, for example, pause, buffer or reschedule deliveries.If you subclass this client and add and mutate any attributes, override
reset()
.See also
If your code runs subprocesses, be familiar with asyncio’s Subprocesses.
If your code configures logging, see Dealing with handlers that block.
- Parameters:
kwargs (Any) –
- executor#
The thread pool executor.
- stopping#
Whether the client is being stopped deliberately.
- connect()[source]#
Connect to RabbitMQ, create a channel, set the prefetch count, and declare an exchange, unless using the default exchange.
- Return type:
None
- connection_blocked_callback(connection, method)[source]#
Mark the client as blocked.
Subclasses must implement any logic for pausing deliveries or filling buffers.
- Parameters:
connection (Connection) –
method (Any) –
- Return type:
None
- connection_unblocked_callback(connection, method)[source]#
Mark the client as unblocked.
Subclasses must implement any logic for resuming deliveries or clearing buffers.
- Parameters:
connection (Connection) –
method (Any) –
- Return type:
None
- reconnect()[source]#
Reconnect to RabbitMQ, unless a signal was received while the timer was running. If so, stop the IO loop.
- Return type:
None
- reset()[source]#
Reset the client’s state, before reconnecting.
Override this method in subclasses, if your subclass adds and mutates any attributes.
- Return type:
None
- connection_open_error_callback(connection, error)[source]#
Retry, once the connection couldn’t be established.
- Parameters:
connection (Connection) –
- Return type:
None
- connection_close_callback(connection, reason)[source]#
Reconnect, if the connection was closed unexpectedly. Otherwise, stop the IO loop.
- Parameters:
connection (Connection) –
reason (Exception) –
- Return type:
None
- interrupt()[source]#
Cancel the consumer if consuming and if the channel is open. Otherwise, wait for threads to terminate and close the connection.
- Return type:
None
- connection_open_callback(connection)[source]#
Open a channel, once the connection is open.
- Parameters:
connection (Connection) –
- Return type:
None
- channel_open_callback(channel)[source]#
Set the prefetch count, once the channel is open.
- Parameters:
channel (Channel) –
- Return type:
None
- channel_cancelok_callback(method)[source]#
Close the channel, once the consumer is cancelled. The
channel_close_callback()
closes the connection.- Parameters:
method (pika.frame.Method[pika.spec.Basic.CancelOk]) –
- Return type:
Any
- channel_close_callback(channel, reason)[source]#
Close the connection, once the client cancelled the consumer or once RabbitMQ closed the channel due to, e.g., redeclaring exchanges with inconsistent parameters.
A warning is logged, in case it was the latter.
- channel_qosok_callback(method)[source]#
Declare the exchange, once the prefetch count is set.
- Parameters:
method (pika.frame.Method[pika.spec.Basic.QosOk]) –
- Return type:
None
- exchange_declareok_callback(method)[source]#
Perform user-specified actions, once the exchange is declared.
- Parameters:
method (pika.frame.Method[pika.spec.Exchange.DeclareOk]) –
- Return type:
None
- class yapw.clients.AsyncConsumer(*, on_message_callback, queue, routing_keys=None, decorator=<function halt>, arguments=None, **kwargs)[source]#
An asynchronous consumer, extending
Async
.After calling
start()
, this client declares thequeue
, binds it to the exchange with therouting_keys
, and starts consuming messages from that queue, using theon_message_callback
.The
on_message_callback
andqueue
keyword arguments are required. If norouting_keys
are provided, thequeue
is bound to the exchange using its name as the routing key.The
pika.channel.Channel.basic_consume()
call sets its callback to an empty method,channel_consumeok_callback()
. Define this method in a subclass, if needed.- Parameters:
- __init__(*, on_message_callback, queue, routing_keys=None, decorator=<function halt>, arguments=None, **kwargs)[source]#
See also
- Parameters:
- Return type:
None
- queue#
The queue’s name.
- routing_keys#
The queue’s routing keys.
- arguments#
The
arguments
parameter to thequeue_declare
method.
- on_message_callback#
The consumer callback.
- decorator#
The decorator of the consumer callback.
- queue_declareok_callback(method, queue_name)[source]#
Bind the queue to the first routing key, once the queue is declared.
- Parameters:
method (pika.frame.Method[pika.spec.Queue.DeclareOk]) –
queue_name (str) –
- Return type:
None
- queue_bindok_callback(method, queue_name, index)[source]#
Bind the queue to the remaining routing keys, or start consuming if all routing keys bound.
- Parameters:
method (pika.frame.Method[pika.spec.Queue.BindOk]) –
queue_name (str) –
index (int) –
- Return type:
None
- consume(on_message_callback, decorator, queue_name)[source]#
Start consuming messages from the queue.
Run the consumer callback in separate threads, to not block the IO loop. (This assumes the consumer callback is CPU-bound.) Add signal handlers to wait for threads to terminate.
The consumer callback is a function that accepts
(state, channel, method, properties, body)
arguments. Thestate
argument contains thread-safe attributes. The rest of the arguments are the same aspika.channel.Channel.basic_consume()
’son_message_callback
.- Parameters:
on_message_callback (yapw.types.ConsumerCallback) –
decorator (yapw.types.Decorator) –
queue_name (str) –
- Return type:
None
- channel_cancel_callback(method)[source]#
Close the channel.
RabbitMQ uses basic.cancel if a channel is consuming a queue and the queue is deleted.
- channel_consumeok_callback(method)[source]#
Override this method in subclasses to perform any other work, once the consumer is started.
- Parameters:
method (pika.frame.Method[pika.spec.Basic.ConsumeOk]) –
- Return type:
None