Source code for yapw.util

import json
import pickle
from typing import TYPE_CHECKING, Any, Union

import pika

    import orjson

    jsonlib = orjson
except ImportError:
    jsonlib = json  # type: ignore[misc]

from yapw.types import PublishKeywords, State

    from yapw.clients import Base

[docs] def json_dumps(message: Any) -> bytes: """ Serialize a Python object to JSON formatted bytes. Uses `orjson <>`__ if available. :param message: a Python object :returns: JSON formatted bytes """ if jsonlib == json: return json.dumps(message, separators=(",", ":")).encode() return orjson.dumps(message)
[docs] def default_decode(body: bytes, content_type: str | None) -> Any: """ If the content type is "application/json", deserializes the JSON formatted bytes to a Python object. Otherwise, returns the bytes (which the consumer callback can deserialize independently). Uses `orjson <>`__ if available. :param body: the encoded message :param content_type: the message's content type :returns: a Python object """ if content_type == "application/json": return jsonlib.loads(body) return body
[docs] def default_encode(message: Any, content_type: str) -> bytes: """ Encode the decoded message to bytes. - If the content type is "application/json", serialize the message to JSON formatted bytes. - If the message is a string, encode it to bytes. - If the message is bytes, return it. - Otherwise, serialize the message to its picked representation. :param message: a decoded message :param content_type: the message's content type :returns: an encoded message """ if content_type == "application/json": return json_dumps(message) if isinstance(message, str): return message.encode() if isinstance(message, bytes): return message return pickle.dumps(message)
[docs] def basic_publish_kwargs(state: Union["Base[Any]", State[Any]], message: Any, routing_key: str) -> PublishKeywords: """ Prepare keyword arguments for ``basic_publish``. :param state: an object with the attributes ``format_routing_key``, ``exchange``, ``encode``, ``content_type`` and ``delivery_mode`` :param message: a decoded message :param routing_key: the routing key :returns: keyword arguments for ``basic_publish`` """ formatted = state.format_routing_key(routing_key) body = state.encode(message, state.content_type) properties = pika.BasicProperties(content_type=state.content_type, delivery_mode=state.delivery_mode) return {"exchange":, "routing_key": formatted, "body": body, "properties": properties}
[docs] def basic_publish_debug_args( channel: | pika.adapters.blocking_connection.BlockingChannel, message: Any, keywords: PublishKeywords, ) -> tuple[str, Any, int, str, str]: """ Prepare arguments for ``logger.debug`` related to publishing a message. :param channel: the channel from which to call ``basic_publish`` :param message: a decoded message :param keywords: keyword arguments for ``basic_publish`` :returns: arguments for ``logger.debug`` """ return ( "Published message %r on channel %s to exchange %s with routing key %s", message, channel.channel_number, keywords["exchange"], keywords["routing_key"], )