import sys
from collections.abc import Callable
from typing import Any, Generic, NamedTuple, TypedDict, TypeVar
import pika
T = TypeVar("T")
Encode = Callable[[Any, str], bytes]
Decode = Callable[[bytes, str | None], Any]
if sys.version_info >= (3, 11):
class State(NamedTuple, Generic[T]):
"""
Attributes that can be used safely in consumer callbacks.
"""
#: A function to format the routing key.
format_routing_key: Callable[[str], str]
#: A function to shut down the client.
interrupt: Callable[[T], None]
#: The connection.
connection: T
#: The exchange name.
exchange: str
#: The message bodies' encoder.
encode: Encode
#: The messages' content type.
content_type: str
#: The messages' delivery mode.
delivery_mode: int
else:
[docs]
class State(NamedTuple):
"""
Attributes that can be used safely in consumer callbacks.
"""
#: A function to format the routing key.
format_routing_key: Callable[[str], str]
#: A function to shut down the client.
interrupt: Callable[[T], None]
#: The connection.
connection: T
#: The exchange name.
exchange: str
#: The message bodies' encoder.
encode: Encode
#: The messages' content type.
content_type: str
#: The messages' delivery mode.
delivery_mode: int
[docs]
class PublishKeywords(TypedDict, total=False):
"""
Keyword arguments for ``basic_publish``.
"""
#: The exchange to publish to.
exchange: str
#: The message's routing key.
routing_key: str
#: The message's body.
body: bytes
#: The message's content type and delivery mode.
properties: pika.BasicProperties
ConsumerCallback = Callable[
[State[Any], pika.channel.Channel, pika.spec.Basic.Deliver, pika.BasicProperties, Any],
None,
]
Decorator = Callable[
[Decode, ConsumerCallback, State[Any], pika.channel.Channel, pika.spec.Basic.Deliver, pika.BasicProperties, bytes],
None,
]