Automatic API Docs

These are API docs for the public-facing parts of Serena.

Connection API

async with serena.open_connection(address, *, port=5672, username='guest', password='guest', virtual_host='/', ssl_context=None, **kwargs)

Opens a new connection to the AMQP 0-9-1 server. This is an asynchronous context manager.

Required parameters:

Parameters:

address (str | PathLike[str]) – The address of the server or the absolute path of its Unix socket.

Return type:

AsyncGenerator[AMQPConnection, None]

Optional parameters:

Parameters:
  • port (int) – The port to connect to. Ignores for Unix sockets. Defaults to 5672.

  • username (str) – The username to connect using. Defaults to guest.

  • password (str) – The password to authenticate with. Defaults to guest.

  • virtual_host (str) – The AMQP virtual host to connect to. Defaults to /.

  • ssl_context (Optional[SSLContext]) – The SSL context to connect with. Defaults to None.

In addition, some parameters are passed directly to the class, which allows customising some protocol details.

Parameters:
  • heartbeat_interval – The heartbeat interval to negotiate with, in seconds. This may not be the actual heartbeat interval used.

  • channel_buffer_size – The buffer size for channel messages.

  • desired_frame_size – The maximum body message frame size desired.

Warning

This uses a TaskGroup underneath; connection-wide closee errors will be transformed into a ExceptionGroup.

class serena.AMQPConnection(stream, *, heartbeat_interval=60, channel_buffer_size=48, desired_frame_size=131072)

Bases: object

A single AMQP connection.

property open: bool

Checks if this connection is actually open.

has_capability(name)

Checks if the server exposes a capability.

Return type:

bool

heartbeat_statistics()

Returns a copy of the heartbeat statistics for this connection.

Return type:

HeartbeatStatistics

async with open_channel()

Opens a new channel.

Return type:

AsyncGenerator[Channel, None]

Returns:

An asynchronous context manager that will open a new channel.

async with open_channel_pool(initial_channels=64)

Opens a new channel pool.

Parameters:

initial_channels (int) – The number of channels to use initially.

Return type:

AsyncGenerator[ChannelPool, None]

Returns:

An asynchronous context manager that will open a new channel pool.

await close(reply_code=200, reply_text='Normal close')

Closes the connection. This method is idempotent.

There’s no real reason to call this method.

Parameters:
  • reply_code (int) – The code to send when closing.

  • reply_text (str) – The text to send when replying.

Return type:

None

Returns:

Nothing.

Channel-like API

This is shared between Channel and ChannelPool.

class serena.mixin.ChannelLike

Bases: ABC

Base object shared between the Channel and ChannelPool object.

abstractmethod await exchange_declare(name, type, *, passive=False, durable=False, auto_delete=False, internal=False, arguments=None)

Declares a new exchange.

Parameters:
  • name (str) – The name of the exchange. Must not be empty.

  • type (ExchangeType | str) – The type of the exchange to create.

  • passive (bool) – If True, the server will return a DeclareOk if the exchange exists, and an error if it doesn’t. This can be used to inspect server state without modification.

  • durable (bool) – If True, then the declared exchange will survive a server restart.

  • auto_delete (bool) – If True, then the declared exchange will be automatically deleted when all queues have finished using it.

  • internal (bool) – If True, then the exchange may not be used directly by publishers.

  • arguments (Optional[dict[str, Any]]) – A dictionary of implementation-specific arguments.

Return type:

str

Returns:

The name of the exchange, as it exists on the server.

abstractmethod await exchange_delete(name, *, if_unused=False)

Deletes an exchange.

Parameters:
  • name (str) – The name of the exchange to delete.

  • if_unused (bool) – If True, then the exchange will only be deleted if it has no queue bindings.

Return type:

None

Returns:

Nothing.

abstractmethod await exchange_bind(destination, source, routing_key, arguments=None)

Binds an exchange to another exchange. This is a RabbitMQ extension and may not be supported in other AMQP implementations.

Parameters:
  • destination (str) – The name of the destination exchange to bind. A blank name means the default exchange.

  • source (str) – The name of the source exchange to bind. A blank name means the default exchange.

  • routing_key (str) – The routing key for the exchange binding.

  • arguments (Optional[dict[str, Any]]) – A dictionary of implementation-specific arguments.

Return type:

None

Returns:

Nothing.

abstractmethod await exchange_unbind(destination, source, routing_key, arguments=None)

Unbinds an exchange from another exchange. This is a RabbitMQ extension and may not be supported in other AMQP implementations.

Parameters:
  • destination (str) – The name of the destination exchange to unbind. A blank name means the default exchange.

  • source (str) – The name of the source exchange to unbind. A blank name means the default exchange.

  • routing_key (str) – The routing key for the exchange binding that is being unbinded.

  • arguments (Optional[dict[str, Any]]) – A dictionary of implementation-specific arguments.

Return type:

None

Returns:

Nothing.

abstractmethod await queue_declare(name, *, passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)

Declares a queue.

Parameters:
  • name (str) – The name of the queue. If blank, a name will be automatically generated by the server and returned.

  • passive (bool) – If True, the server will return a DeclareOk if the queue exists, and an error if it doesn’t. This can be used to inspect server state without modification.

  • durable (bool) – If True, the queue being created will persist past server restarts.

  • exclusive (bool) – If True, this queue will only belong to this connection, and will be automatically deleted when the connection closes. Best combined with an automatically generated queue name.

  • auto_delete (bool) – If True, this queue will be automatically deleted after all consumers have finished. The queue will never be deleted before the first consumer starts.

  • arguments (Optional[dict[str, Any]]) – Optional server implementation-specific arguments.

Return type:

QueueDeclareOkPayload

Returns:

The QueueDeclareOkPayload the server returned.

abstractmethod await queue_bind(queue_name, exchange_name, routing_key, arguments=None)

Binds a queue to an exchange.

Parameters:
  • queue_name (str) – The queue to bind.

  • exchange_name (str) – The exchange to bind to.

  • routing_key (str) – The routing key to use when binding.

  • arguments (Optional[dict[str, Any]]) – Any server-specific or exchange-specific extra arguments.

Return type:

None

Returns:

Nothing.

abstractmethod await queue_delete(queue_name, *, if_empty=False, if_unused=False)

Deletes a queue.

Parameters:
  • queue_name (str) – The name of the queue to delete.

  • if_empty (bool) – If True, the queue will only be deleted if it is empty.

  • if_unused (bool) – If True, the queue will only be deleted if it is unused.

Return type:

int

Returns:

The number of messages deleted.

abstractmethod await queue_purge(queue_name)

Purges all messages from a queue.

Parameters:

queue_name (str) – The name of the queue to be purged.

Return type:

int

Returns:

The number of messages deleted.

abstractmethod await queue_unbind(queue_name, exchange_name, routing_key, arguments=None)

Unbinds a queue from an exchange.

Parameters:
  • queue_name (str) – The name of the queue to unbind.

  • exchange_name (str) – The name of the exchange to unbind from.

  • routing_key (str) – The routing key to unbind using.

  • arguments (Optional[dict[str, Any]]) – Implementation-specific arguments to use.

Return type:

None

abstractmethod basic_consume(queue_name, consumer_tag='', *, no_local=False, no_ack=False, exclusive=False, auto_ack=True, arguments=None)

Starts a basic consume operation. This returns an async context manager over an asynchronous iterator that yields incoming AMQPMessage instances.

The channel can still be used for other operations during this operation.

Parameters:
  • queue_name (str) – The name of the queue to consume from.

  • consumer_tag (str) – The tag for this consume.

  • no_local (bool) – If True, messages will not be sent to this consumer if it is on the same connection that published them.

  • no_ack (bool) – If True, messages will not be expected to be acknowledged. This can cause data loss.

  • exclusive (bool) – If True, then only this consumer can access the queue. Will fail if there is another consumer already active.

  • arguments (Optional[dict[str, Any]]) – Implementation-specific arguments.

  • auto_ack (bool) – If True, then messages will be automatically positively acknowledged in the generator loop. Has no effect if no_ack is True. This is a Serena-exclusive feature, not a protocol feature.

Return type:

AbstractAsyncContextManager[AsyncIterable[AMQPMessage]]

abstractmethod await basic_publish(exchange_name, routing_key, body, *, header=None, mandatory=True, immediate=False)

Publishes a message to a specific exchange.

Parameters:
  • exchange_name (str) – The name of the exchange to publish to. This can be blank to mean the default exchange.

  • routing_key (str) – The routing key to publish to.

  • body (bytes) – The body for this payload.

  • header (Optional[BasicHeader]) – The headers to use for this message. If unset, will use the default blank headers.

  • mandatory (bool) – Iff True, the server must return a Return message if the message could not be routed to a queue.

  • immediate (bool) – Iff True, the server must return a Return message if the message could not be immediately consumed.

Raises:

MessageReturnedError – If the message was returned to the publisher.

Return type:

None

Warning

The immediate flag is not supported in RabbitMQ 3.x, and will cause the connection to close.

abstractmethod await basic_get(queue, *, no_ack=False)

Gets a single message from a queue.

Parameters:
  • queue (str) – The queue to get the message from.

  • no_ack (bool) – Iff not True, then messages will need to be explicitly acknowledged on consumption.

Return type:

Optional[AMQPMessage]

Returns:

A AMQPMessage if one existed on the queue, otherwise None.

Specific APIs

await Channel.wait_until_closed()

Waits until the channel is closed.

Return type:

None

Message API

class serena.AMQPMessage(channel, envelope, header, body)

Bases: object

The wrapper around a single, delivered AMQP message.

envelope: AMQPEnvelope

The “envelope” for the message. Wraps data about the delivery of the message.

header: BasicHeader

The header for the message, containing application-specific details.

body: bytes

The actual body of this message.

await ack(*, multiple=False)

Acknowledges this message. See basic_ack.

Return type:

None

await nack(*, multiple=False, requeue=True)

Negatively acknowledges this message. See basic_nack.

Return type:

None

await reject(*, requeue=True)

Rejects this message. See basic_reject.

Return type:

None