API
Basics
There are two principal objects when using aioamqp:
The protocol object, used to begin a connection to aioamqp,
The channel object, used when creating a new channel to effectively use an AMQP channel.
Starting a connection
Starting a connection to AMQP really mean instanciate a new asyncio Protocol subclass.
- aioamqp.connect(host, port, login, password, virtualhost, ssl, login_method, insist, protocol_factory, verify_ssl, loop, kwargs) Transport, AmqpProtocol
Convenient method to connect to an AMQP broker
- Parameters
host (str) – the host to connect to
port (int) – broker port
login (str) – login
password (str) – password
virtualhost (str) – AMQP virtualhost to use for this connection
ssl (bool) – create an SSL connection instead of a plain unencrypted one
verify_ssl (bool) – verify server’s SSL certificate (True by default)
login_method (str) – AMQP auth method
insist (bool) – insist on connecting to a server
protocol_factory (AmqpProtocol) – factory to use, if you need to subclass AmqpProtocol
loop (EventLopp) – set the event loop to use
kwargs (dict) – arguments to be given to the protocol_factory instance
import asyncio
import aioamqp
async def connect():
try:
transport, protocol = await aioamqp.connect() # use default parameters
except aioamqp.AmqpClosedConnection:
print("closed connections")
return
print("connected !")
await asyncio.sleep(1)
print("close connection")
await protocol.close()
transport.close()
asyncio.get_event_loop().run_until_complete(connect())
In this example, we just use the method “start_connection” to begin a communication with the server, which deals with credentials and connection tunning.
If you’re not using the default event loop (e.g. because you’re using aioamqp from a different thread), call aioamqp.connect(loop=your_loop).
The AmqpProtocol uses the kwargs arguments to configure the connection to the AMQP Broker:
- AmqpProtocol.__init__(self, *args, **kwargs):
The protocol to communicate with AMQP
- Parameters
channel_max (int) – specifies highest channel number that the server permits. Usable channel numbers are in the range 1..channel-max. Zero indicates no specified limit.
frame_max (int) – the largest frame size that the server proposes for the connection, including frame header and end-byte. The client can negotiate a lower value. Zero means that the server does not impose any specific limit but may reject very large frames if it cannot allocate resources for them.
heartbeat (int) – the delay, in seconds, of the connection heartbeat that the server wants. Zero means the server does not want a heartbeat.
loop (Asyncio.EventLoop) – specify the eventloop to use.
client_properties (dict) – configure the client to connect to the AMQP server.
Handling errors
The connect() method has an extra ‘on_error’ kwarg option. This on_error is a callback or a coroutine function which is called with an exception as the argument:
import asyncio
import socket
import aioamqp
async def error_callback(exception):
print(exception)
async def connect():
try:
transport, protocol = await aioamqp.connect(
host='nonexistant.com',
on_error=error_callback,
client_properties={
'program_name': "test",
'hostname' : socket.gethostname(),
},
)
except aioamqp.AmqpClosedConnection:
print("closed connections")
return
asyncio.get_event_loop().run_until_complete(connect())
Publishing messages
A channel is the main object when you want to send message to an exchange, or to consume message from a queue:
channel = await protocol.channel()
When you want to produce some content, you declare a queue then publish message into it:
await channel.queue_declare("my_queue")
await channel.publish("aioamqp hello", '', "my_queue")
Note: we’re pushing message to “my_queue” queue, through the default amqp exchange.
Consuming messages
When consuming message, you connect to the same queue you previously created:
import asyncio
import aioamqp
async def callback(channel, body, envelope, properties):
print(body)
channel = await protocol.channel()
await channel.basic_consume(callback, queue_name="my_queue")
The basic_consume
method tells the server to send us the messages, and will call callback
with amqp response arguments.
The consumer_tag
is the id of your consumer, and the delivery_tag
is the tag used if you want to acknowledge the message.
In the callback:
the first
body
parameter is the messagethe
envelope
is an instance of envelope.Envelope class which encapsulate a group of amqp parameter such as:consumer_tag delivery_tag exchange_name routing_key is_redeliver
the
properties
are message properties, an instance ofproperties.Properties
with the following members:content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration message_id timestamp message_type user_id app_id cluster_id
Server Cancellation
RabbitMQ offers an AMQP extension to notify a consumer when a queue is deleted.
See Consumer Cancel Notification
for additional details. aioamqp
enables the extension for all channels but
takes no action when the consumer is cancelled. Your application can be notified
of consumer cancellations by adding a callback to the channel:
async def consumer_cancelled(channel, consumer_tag):
# implement required cleanup here
pass
async def consumer(channel, body, envelope, properties):
await channel.basic_client_ack(envelope.delivery_tag)
channel = await protocol.channel()
channel.add_cancellation_callback(consumer_cancelled)
await channel.basic_consume(consumer, queue_name="my_queue")
The callback can be a simple callable or an asynchronous co-routine. It can be used to restart consumption on the channel, close the channel, or anything else that is appropriate for your application.
Queues
Queues are managed from the Channel object.
- Channel.queue_declare(queue_name, passive, durable, exclusive, auto_delete, no_wait, arguments, timeout) dict
Coroutine, creates or checks a queue on the broker
- Parameters
queue_name (str) – the queue to receive message from
passive (bool) – if set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. Checks for the same parameter as well.
durable (bool) – if set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts.
exclusive (bool) – request exclusive consumer access, meaning only this consumer can access the queue
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the queue.
timeout (int) – wait for the server to respond after timeout
Here is an example to create a randomly named queue with special arguments x-max-priority:
result = await channel.queue_declare( queue_name='', durable=True, arguments={'x-max-priority': 4} )
- Channel.queue_delete(queue_name, if_unused, if_empty, no_wait, timeout)
Coroutine, delete a queue on the broker
- Parameters
queue_name (str) – the queue to receive message from
if_unused (bool) – the queue is deleted if it has no consumers. Raise if not.
if_empty (bool) – the queue is deleted if it has no messages. Raise if not.
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the queue.
timeout (int) – wait for the server to respond after timeout
- Channel.queue_bind(queue_name, exchange_name, routing_key, no_wait, arguments, timeout)
Coroutine, bind a queue to an exchange
- Parameters
queue_name (str) – the queue to receive message from.
exchange_name (str) – the exchange to bind the queue to.
routing_key (str) – the routing_key to route message.
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the queue.
timeout (int) – wait for the server to respond after timeout
This simple example creates a queue, an exchange and bind them together.
channel = await protocol.channel() await channel.queue_declare(queue_name='queue') await channel.exchange_declare(exchange_name='exchange') await channel.queue_bind('queue', 'exchange', routing_key='')
- Channel.queue_unbind(queue_name, exchange_name, routing_key, arguments, timeout)
Coroutine, unbind a queue and an exchange.
- Parameters
queue_name (str) – the queue to receive message from.
exchange_name (str) – the exchange to bind the queue to.
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the queue.
timeout (int) – wait for the server to respond after timeout
- PARAM STR ROUTING_KEY
THE ROUTING_KEY TO ROUTE MESSAGE.
- Channel.queue_purge(queue_name, no_wait, timeout)
Coroutine, purge a queue
- Parameters
queue_name (str) – the queue to receive message from.
Exchanges
Exchanges are used to correctly route message to queue: a publisher publishes a message into an exchanges, which routes the message to the corresponding queue.
- Channel.exchange_declare(exchange_name, type_name, passive, durable, auto_delete, no_wait, arguments, timeout) dict
Coroutine, creates or checks an exchange on the broker
- Parameters
exchange_name (str) – the exchange to receive message from
type_name (str) – the exchange type (fanout, direct, topics …)
passive (bool) – if set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. Checks for the same parameter as well.
durable (bool) – if set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts.
auto_delete (bool) – if set, the exchange is deleted when all queues have finished using it.
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the exchange.
timeout (int) – wait for the server to respond after timeout
Note: the internal flag is deprecated and not used in this library.
channel = await protocol.channel() await channel.exchange_declare(exchange_name='exchange', auto_delete=True)
- Channel.exchange_delete(exchange_name, if_unused, no_wait, timeout)
Coroutine, delete a exchange on the broker
- Parameters
exchange_name (str) – the exchange to receive message from
if_unused (bool) – the exchange is deleted if it has no consumers. Raise if not.
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the exchange.
timeout (int) – wait for the server to respond after timeout
- Channel.exchange_bind(exchange_destination, exchange_source, routing_key, no_wait, arguments, timeout)
Coroutine, binds two exchanges together
- Parameters
exchange_destination (str) – specifies the name of the destination exchange to bind
exchange_source (str) – specified the name of the source exchange to bind.
exchange_destination – specifies the name of the destination exchange to bind
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the exchange.
timeout (int) – wait for the server to respond after timeout
- Channel.exchange_unbind(exchange_destination, exchange_source, routing_key, no_wait, arguments, timeout)
Coroutine, unbind an exchange from an exchange.
- Parameters
exchange_destination (str) – specifies the name of the destination exchange to bind
exchange_source (str) – specified the name of the source exchange to bind.
exchange_destination – specifies the name of the destination exchange to bind
no_wait (bool) – if set, the server will not respond to the method
arguments (dict) – AMQP arguments to be passed when creating the exchange.
timeout (int) – wait for the server to respond after timeout