import json
import pickle
from typing import TYPE_CHECKING, Any, Union
import pika
try:
import orjson
jsonlib = orjson
except ImportError:
jsonlib = json # type: ignore[misc]
from yapw.types import PublishKeywords, State
if TYPE_CHECKING:
from yapw.clients import Base
[docs]
def json_dumps(message: Any) -> bytes:
"""
Serialize a Python object to JSON formatted bytes.
Uses `orjson <https://pypi.org/project/orjson/>`__ if available.
:param message: a Python object
:returns: JSON formatted bytes
"""
if jsonlib == json:
return json.dumps(message, separators=(",", ":")).encode()
return orjson.dumps(message)
[docs]
def default_decode(body: bytes, content_type: str | None) -> Any:
"""
If the content type is "application/json", deserializes the JSON formatted bytes to a Python object. Otherwise,
returns the bytes (which the consumer callback can deserialize independently).
Uses `orjson <https://pypi.org/project/orjson/>`__ if available.
:param body: the encoded message
:param content_type: the message's content type
:returns: a Python object
"""
if content_type == "application/json":
return jsonlib.loads(body)
return body
[docs]
def default_encode(message: Any, content_type: str) -> bytes:
"""
Encode the decoded message to bytes.
- If the content type is "application/json", serialize the message to JSON formatted bytes.
- If the message is a string, encode it to bytes.
- If the message is bytes, return it.
- Otherwise, serialize the message to its picked representation.
:param message: a decoded message
:param content_type: the message's content type
:returns: an encoded message
"""
if content_type == "application/json":
return json_dumps(message)
if isinstance(message, str):
return message.encode()
if isinstance(message, bytes):
return message
return pickle.dumps(message)
[docs]
def basic_publish_kwargs(state: Union["Base[Any]", State[Any]], message: Any, routing_key: str) -> PublishKeywords:
"""
Prepare keyword arguments for ``basic_publish``.
:param state: an object with the attributes ``format_routing_key``, ``exchange``, ``encode``, ``content_type`` and
``delivery_mode``
:param message: a decoded message
:param routing_key: the routing key
:returns: keyword arguments for ``basic_publish``
"""
formatted = state.format_routing_key(routing_key)
body = state.encode(message, state.content_type)
properties = pika.BasicProperties(content_type=state.content_type, delivery_mode=state.delivery_mode)
return {"exchange": state.exchange, "routing_key": formatted, "body": body, "properties": properties}
[docs]
def basic_publish_debug_args(
channel: pika.channel.Channel | pika.adapters.blocking_connection.BlockingChannel,
message: Any,
keywords: PublishKeywords,
) -> tuple[str, Any, int, str, str]:
"""
Prepare arguments for ``logger.debug`` related to publishing a message.
:param channel: the channel from which to call ``basic_publish``
:param message: a decoded message
:param keywords: keyword arguments for ``basic_publish``
:returns: arguments for ``logger.debug``
"""
return (
"Published message %r on channel %s to exchange %s with routing key %s",
message,
channel.channel_number,
keywords["exchange"],
keywords["routing_key"],
)