Decorators#
Decorators to be used with consumer callbacks.
A message must be ack’d or nack’d if using consumer prefetch, because otherwise RabbitMQ stops delivering messages. The decorators help to ensure that, in case of error, either the message is nack’d or the process is halted.
halt()
is the default decorator. It stops the consumer and halts the process, so that an
administrator can decide when it is appropriate to restart it.
The other decorators require more care. For example, if a callback inserts messages into a database, and the database
is down, but this exception isn’t handled by the callback, then the discard()
or
requeue()
decorators would end up nack’ing all messages in the queue.
Decorators look like this (see the decorate()
function for context):
from yapw.decorators import decorate
def myfunction(decode, callback, state, channel, method, properties, body):
def errback(exception):
# do something, like halting the process or nack'ing the message
decorate(decode, callback, state, channel, method, properties, body, errback)
- yapw.decorators.decorate(decode, callback, state, channel, method, properties, body, errback, finalback=None)[source]#
Use this function to define your own decorators.
Decode the message
body
using thedecode
function, and call the consumercallback
.If the
callback
function raises an exception, call theerrback
function. In any case, call thefinalback
function after calling thecallback
function.If the
decode
function raises an exception, shut down the client in the main thread.See also
yapw.clients
for details on the consumer callback function signature.- Parameters:
decode (Decode) –
callback (ConsumerCallback) –
state (State[Any]) –
channel (pika.channel.Channel) –
method (pika.spec.Basic.Deliver) –
properties (pika.BasicProperties) –
body (bytes) –
errback (Callable[[Exception], None]) –
finalback (Callable[[], None] | None) –
- Return type:
None
- yapw.decorators.halt(decode, callback, state, channel, method, properties, body)[source]#
If the
callback
function raises an exception, shut down the client in the main thread, without acknowledgment.- Parameters:
decode (Decode) –
callback (ConsumerCallback) –
state (State[Any]) –
channel (pika.channel.Channel) –
method (pika.spec.Basic.Deliver) –
properties (pika.BasicProperties) –
body (bytes) –
- Return type:
None
- yapw.decorators.discard(decode, callback, state, channel, method, properties, body)[source]#
If the
callback
function raises an exception, nack the message, without requeueing.- Parameters:
decode (Decode) –
callback (ConsumerCallback) –
state (State[Any]) –
channel (pika.channel.Channel) –
method (pika.spec.Basic.Deliver) –
properties (pika.BasicProperties) –
body (bytes) –
- Return type:
None
- yapw.decorators.requeue(decode, callback, state, channel, method, properties, body)[source]#
If the
callback
function raises an exception, nack the message, requeueing it unless it was redelivered.- Parameters:
decode (Decode) –
callback (ConsumerCallback) –
state (State[Any]) –
channel (pika.channel.Channel) –
method (pika.spec.Basic.Deliver) –
properties (pika.BasicProperties) –
body (bytes) –
- Return type:
None