Yet Another Pika Wrapper 0.1.4#

PyPI Version Build Status Coverage Status Python Version

Yet Another Pika Wrapper (yapw – pronounced yapu) makes it easier to create a Pika client that:

  • Runs consumer callbacks in separate threads.

  • Automatically decodes and encodes messages.

  • Has error handling, signal handling, useful logging and good defaults.

(If you are viewing this on GitHub, open the full documentation for additional details.)

Configure a RabbitMQ client#

Import a client class, for example:

from yapw.clients import Blocking
from yapw.clients import AsyncConsumer
# Or, if the client will exclusively publish messages:
from yapw.clients import Async

Publish messages outside a consumer callback#

from yapw.clients import Blocking


publisher = Blocking(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.publish({"message": "value"}, routing_key="messages")
publisher.close()
from yapw.clients import Async


class Publisher(Async):
    def exchange_ready(self):
        self.publish({"message": "value"}, routing_key="messages")
        self.interrupt()  # only if you want to stop the client


publisher = Publisher(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.start()

The routing key is namespaced by the exchange name, to make it “myexchange_messages”.

Consume messages#

from yapw.clients import Blocking
from yapw.decorators import discard
from yapw.methods import ack, nack, publish


def callback(state, channel, method, properties, body):
    try:
        key = body["key"]
        # do work
        publish(state, channel, {"message": "value"}, "myroutingkey")
    except KeyError:
        nack(state, channel, method.delivery_tag)
    else:
        ack(state, channel, method.delivery_tag)


consumer = Blocking(
    url="amqp://user:pass@127.0.0.1",
    exchange="myexchange",
    prefetch_count=5,
)
consumer.consume(callback, queue="messages", decorator=discard)
from yapw.clients import AsyncConsumer
from yapw.decorators import discard
from yapw.methods import ack, nack, publish


def callback(state, channel, method, properties, body):
    try:
        key = body["key"]
        # do work
        publish(state, channel, {"message": "value"}, "myroutingkey")
    except KeyError:
        nack(state, channel, method.delivery_tag)
    else:
        ack(state, channel, method.delivery_tag)


consumer = AsyncConsumer(
    url="amqp://user:pass@127.0.0.1",
    exchange="myexchange",
    prefetch_count=5,
    on_message_callback=callback,
    queue="messages",
    decorator=discard,
)
consumer.start()

yapw implements a pattern whereby the consumer declares and binds a queue. By default, the queue’s name and binding key are the same, and are namespaced by the exchange name.

To manually set the binding keys:

consumer.consume(
    callback,
    queue="messages",
    routing_keys=["a", "b"],
    decorator=discard,
)
consumer = AsyncConsumer(
    # ...
    on_message_callback=callback,
    queue="messages",
    routing_keys=["a", "b"],
    decorator=discard
)

yapw uses a thread pool to run the consumer callback in separate threads.

See also

yapw.clients for details on the consumer callback function signature.

Channel methods#

The ack(), nack() and publish() functions are safe to call from a consumer callback running in another thread. They log an error if the connection or channel isn’t open.

Note

Thread-safe helper functions have not yet been defined for all relevant channel methods.

Encoding and decoding#

By default, when publishing messages, the content type of “application/json” is used, the message body is encoded using the default_encode() function, which serializes to JSON-formatted bytes when the content type is “application/json”.

Similarly, when consuming messages, the default_decode() function is used, which deserializes from JSON-formatted bytes when the consumed message’s content type is “application/json”. (That is how the sample code above can read body["key"] without decoding.)

You can change this behavior. For example, change the bodies of the encode and decode functions below:

import json


# Return bytes.
class encode(message, content_type):
    if content_type == "application/json":
        return json.dumps(message).encode()
    return message


# Accept body as bytes.
class decode(body, content_type):
    if content_type == "application/json":
        return json.loads(body)
    return body


client = Blocking(encode=encode, decode=decode)

Error handling#

The decorator keyword argument to the consume() method is a function that wraps the consumer callback (the first argument to the consume method). This function can be used to:

  • Offer conveniences to the consumer callback, like decoding the message body

  • Handle unexpected errors from the consumer callback

Note

For the AsyncConsumer client, the decorator and callback arguments are passed to its __init__() method.

When using consumer prefetch, if a message is not ack’d or nack’d, then RabbitMQ stops delivering messages. As such, it’s important to handle unexpected errors by either acknowledging the message or halting the process. Otherwise, the process will stall.

The default decorator is the yapw.decorators.halt() function, which shuts down the client in the main thread, without acknowledging the message. See the available decorators and the rationale for the default setting.

All decorators also decode the message body, which can be configured as above. If an exception occurs while decoding, the decorator shuts down the client in the main thread, without acknowledging the message.

Signal handling#

Every client shuts down gracefully if it receives the SIGTERM (system exit) or SIGINT (keyboard interrupt) signals. It stops consuming messages, waits for threads to terminate, and closes the RabbitMQ connection.

Signal handlers are added only if a client is instantiated in the main thread, because only the main thread is allowed to set a new signal handler. An example of a non-main thread is a web request.

Copyright (c) 2021 Open Contracting Partnership, released under the BSD license