Source code for yapw.decorators

"""
Decorators to be used with consumer callbacks.

A message must be ack'd or nack'd if using `consumer prefetch <https://www.rabbitmq.com/consumer-prefetch.html>`__,
because otherwise `RabbitMQ stops delivering messages <https://www.rabbitmq.com/confirms.html#channel-qos-prefetch>`__.
The decorators help to ensure that, in case of error, either the message is nack'd or the process is halted.

:func:`~yapw.decorators.halt` is the default decorator. It stops the consumer and halts the process, so that an
administrator can decide when it is appropriate to restart it.

The other decorators require more care. For example, if a callback inserts messages into a database, and the database
is down, but this exception isn't handled by the callback, then the :func:`~yapw.decorators.discard` or
:func:`~yapw.decorators.requeue` decorators would end up nack'ing all messages in the queue.

Decorators look like this (see the :func:`~yapw.decorators.decorate` function for context):

.. code-block:: python

   from yapw.decorators import decorate


   def myfunction(decode, callback, state, channel, method, properties, body):
       def errback(exception):
           # do something, like halting the process or nack'ing the message

       decorate(decode, callback, state, channel, method, properties, body, errback)
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

from yapw.methods import add_callback_threadsafe, nack

if TYPE_CHECKING:
    from collections.abc import Callable

    import pika

    from yapw.types import ConsumerCallback, Decode, State

logger = logging.getLogger(__name__)


[docs] def decorate( decode: Decode, callback: ConsumerCallback, state: State[Any], channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.BasicProperties, body: bytes, errback: Callable[[Exception], None], finalback: Callable[[], None] | None = None, ) -> None: """ Use this function to define your own decorators. Decode the message ``body`` using the ``decode`` function, and call the consumer ``callback``. If the ``callback`` function raises an exception, call the ``errback`` function. In any case, call the ``finalback`` function after calling the ``callback`` function. If the ``decode`` function raises an exception, shut down the client in the main thread. .. seealso:: :mod:`yapw.clients` for details on the consumer callback function signature. """ logger.debug( "Received message %s with routing key %s and delivery tag %s", body, method.routing_key, method.delivery_tag ) try: message = decode(body, properties.content_type) try: callback(state, channel, method, properties, message) except Exception as exception: # noqa: BLE001 errback(exception) finally: if finalback: finalback() except Exception: logger.exception("%r can't be decoded, shutting down gracefully", body) add_callback_threadsafe(state.connection, state.interrupt)
# https://stackoverflow.com/a/7099229/244258
[docs] def halt( decode: Decode, callback: ConsumerCallback, state: State[Any], channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.BasicProperties, body: bytes, ) -> None: """ If the ``callback`` function raises an exception, shut down the client in the main thread, without acknowledgment. """ def errback(exception: Exception) -> None: logger.exception("Unhandled exception when consuming %r, shutting down gracefully", body) add_callback_threadsafe(state.connection, state.interrupt) decorate(decode, callback, state, channel, method, properties, body, errback)
[docs] def discard( decode: Decode, callback: ConsumerCallback, state: State[Any], channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.BasicProperties, body: bytes, ) -> None: """ If the ``callback`` function raises an exception, nack the message, without requeueing. """ def errback(exception: Exception) -> None: logger.exception("Unhandled exception when consuming %r, discarding message", body) nack(state, channel, method.delivery_tag, requeue=False) decorate(decode, callback, state, channel, method, properties, body, errback)
[docs] def requeue( decode: Decode, callback: ConsumerCallback, state: State[Any], channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.BasicProperties, body: bytes, ) -> None: """ If the ``callback`` function raises an exception, nack the message, requeueing it unless it was redelivered. """ def errback(exception: Exception) -> None: requeue = not method.redelivered logger.exception("Unhandled exception when consuming %r (requeue=%r)", body, requeue) nack(state, channel, method.delivery_tag, requeue=requeue) decorate(decode, callback, state, channel, method, properties, body, errback)