Yet Another Pika Wrapper 0.1.4¶
Yet Another Pika Wrapper (yapw – pronounced yapu) makes it easier to create a Pika client that:
Runs consumer callbacks in separate threads.
Automatically decodes and encodes messages.
Has error handling, signal handling, useful logging and good defaults.
(If you are viewing this on GitHub, open the full documentation for additional details.)
Configure a RabbitMQ client¶
Import a client class, for example:
from yapw.clients import Blocking
from yapw.clients import AsyncConsumer
# Or, if the client will exclusively publish messages:
from yapw.clients import Async
Publish messages outside a consumer callback¶
from yapw.clients import Blocking
publisher = Blocking(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.publish({"message": "value"}, routing_key="messages")
publisher.close()
from yapw.clients import Async
class Publisher(Async):
def exchange_ready(self):
self.publish({"message": "value"}, routing_key="messages")
self.interrupt() # only if you want to stop the client
publisher = Publisher(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.start()
The routing key is namespaced by the exchange name, to make it “myexchange_messages”.
Consume messages¶
from yapw.clients import Blocking
from yapw.decorators import discard
from yapw.methods import ack, nack, publish
def callback(state, channel, method, properties, body):
try:
key = body["key"]
# do work
publish(state, channel, {"message": "value"}, "myroutingkey")
except KeyError:
nack(state, channel, method.delivery_tag)
else:
ack(state, channel, method.delivery_tag)
consumer = Blocking(
url="amqp://user:pass@127.0.0.1",
exchange="myexchange",
prefetch_count=5,
)
consumer.consume(callback, queue="messages", decorator=discard)
from yapw.clients import AsyncConsumer
from yapw.decorators import discard
from yapw.methods import ack, nack, publish
def callback(state, channel, method, properties, body):
try:
key = body["key"]
# do work
publish(state, channel, {"message": "value"}, "myroutingkey")
except KeyError:
nack(state, channel, method.delivery_tag)
else:
ack(state, channel, method.delivery_tag)
consumer = AsyncConsumer(
url="amqp://user:pass@127.0.0.1",
exchange="myexchange",
prefetch_count=5,
on_message_callback=callback,
queue="messages",
decorator=discard,
)
consumer.start()
yapw implements a pattern whereby the consumer declares and binds a queue. By default, the queue’s name and binding key are the same, and are namespaced by the exchange name.
To manually set the binding keys:
consumer.consume(
callback,
queue="messages",
routing_keys=["a", "b"],
decorator=discard,
)
consumer = AsyncConsumer(
# ...
on_message_callback=callback,
queue="messages",
routing_keys=["a", "b"],
decorator=discard
)
yapw uses a thread pool to run the consumer callback in separate threads.
See also
yapw.clients
for details on the consumer callback function signature.
Channel methods¶
The ack()
, nack()
and publish()
functions are safe to call from a consumer callback running in another thread. They log an error if the connection or channel isn’t open.
Note
Thread-safe helper functions have not yet been defined for all relevant channel methods
.
Encoding and decoding¶
By default, when publishing messages, the content type of “application/json” is used, the message body is encoded using the default_encode()
function, which serializes to JSON-formatted bytes when the content type is “application/json”.
Similarly, when consuming messages, the default_decode()
function is used, which deserializes from JSON-formatted bytes when the consumed message’s content type is “application/json”. (That is how the sample code above can read body["key"]
without decoding.)
You can change this behavior. For example, change the bodies of the encode
and decode
functions below:
import json
# Return bytes.
class encode(message, content_type):
if content_type == "application/json":
return json.dumps(message).encode()
return message
# Accept body as bytes.
class decode(body, content_type):
if content_type == "application/json":
return json.loads(body)
return body
client = Blocking(encode=encode, decode=decode)
Error handling¶
The decorator
keyword argument to the consume()
method is a function that wraps the consumer callback (the first argument to the consume
method). This function can be used to:
Offer conveniences to the consumer callback, like decoding the message body
Handle unexpected errors from the consumer callback
Note
For the AsyncConsumer
client, the decorator
and callback
arguments are passed to its __init__()
method.
When using consumer prefetch, if a message is not ack’d or nack’d, then RabbitMQ stops delivering messages. As such, it’s important to handle unexpected errors by either acknowledging the message or halting the process. Otherwise, the process will stall.
The default decorator is the yapw.decorators.halt()
function, which shuts down the client in the main thread, without acknowledging the message. See the available decorators and the rationale for the default setting.
All decorators also decode the message body, which can be configured as above. If an exception occurs while decoding, the decorator shuts down the client in the main thread, without acknowledging the message.
Signal handling¶
Every client shuts down gracefully if it receives the SIGTERM
(system exit) or SIGINT
(keyboard interrupt) signals. It stops consuming messages, waits for threads to terminate, and closes the RabbitMQ connection.
Signal handlers are added only if a client is instantiated in the main thread, because only the main thread is allowed to set a new signal handler. An example of a non-main thread is a web request.
Copyright (c) 2021 Open Contracting Partnership, released under the BSD license