Source code for yapw.methods

"""
Functions for calling channel methods from a consumer callback running in another thread.
"""

import functools
import logging
from typing import Any

import pika

from yapw.types import State
from yapw.util import basic_publish_debug_args, basic_publish_kwargs

logger = logging.getLogger(__name__)


[docs] def publish( state: State[Any], channel: pika.channel.Channel, message: Any, routing_key: str, *args: Any, **kwargs: Any ) -> None: """ Publish the ``message`` with the ``routing_key``, to the exchange set by the ``state``. :param state: an object with thread-safe attributes from the client :param channel: the channel from which to call ``basic_publish`` :param message: a decoded message :param routing_key: the routing key """ keywords = basic_publish_kwargs(state, message, routing_key) keywords.update(kwargs) # type: ignore[typeddict-item] # PEP 692 disallows updating TypedDict with Dict _channel_method_from_thread(state.connection, channel, "publish", *args, **keywords) logger.debug(*basic_publish_debug_args(channel, message, keywords))
[docs] def ack(state: State[Any], channel: pika.channel.Channel, delivery_tag: int | None = 0, **kwargs: bool) -> None: """ Ack a message by its delivery tag. :param state: an object with thread-safe attributes from the client :param channel: the channel from which to call ``basic_ack`` :param delivery_tag: the delivery tag """ _channel_method_from_thread(state.connection, channel, "ack", delivery_tag, **kwargs) logger.debug("Ack'd message on channel %s with delivery tag %s", channel.channel_number, delivery_tag)
[docs] def nack(state: State[Any], channel: pika.channel.Channel, delivery_tag: int | None = 0, **kwargs: bool) -> None: """ Nack a message by its delivery tag. :param state: an object with thread-safe attributes from the client :param channel: the channel from which to call ``basic_nack`` :param delivery_tag: the delivery tag """ _channel_method_from_thread(state.connection, channel, "nack", delivery_tag, **kwargs) logger.debug("Nack'd message on channel %s with delivery tag %s", channel.channel_number, delivery_tag)
def _channel_method_from_thread( connection: Any, channel: pika.channel.Channel, method: str, *args: Any, **kwargs: Any ) -> None: if connection.is_open: cb = functools.partial(_channel_method_from_main, channel, method, *args, **kwargs) add_callback_threadsafe(connection, cb) else: logger.error("Can't %s as connection is closed or closing", method) def _channel_method_from_main(channel: pika.channel.Channel, method: str, *args: Any, **kwargs: Any) -> None: if channel.is_open: getattr(channel, f"basic_{method}")(*args, **kwargs) else: logger.error("Can't %s as channel is closed or closing", method)
[docs] def add_callback_threadsafe(connection: Any, callback: Any) -> None: """ Interact with Pika from another thread. :param connection: a RabbitMQ connection :param callback: the callback to add """ # One branch per adapter. if hasattr(connection, "ioloop"): connection.ioloop.call_soon_threadsafe(callback) else: connection.add_callback_threadsafe(callback)