kombu.transport.virtual

Virtual transport implementation.

Emulates the AMQ API for non-AMQ transports.

copyright:
  1. 2009, 2011 by Ask Solem.
license:

BSD, see LICENSE for more details.

Transports

class kombu.transport.virtual.Transport(client, **kwargs)

Virtual transport.

Parameters:clientBrokerConnection instance
Channel

channel class used.

Cycle

cycle class used.

interval

default interval between polling channels for new events.

default_port

port number used when no port is specified.

state

BrokerState containing declared exchanges and bindings (set by constructor).

cycle

FairCycle instance used to fairly drain events from channels (set by constructor).

establish_connection()
close_connection(connection)
create_channel(connection)
close_channel(channel)
drain_events(connection, timeout=None)

Channel

class kombu.transport.virtual.AbstractChannel

This is an abstract class defining the channel methods you’d usually want to implement in a virtual channel.

Do not subclass directly, but rather inherit from Channel instead.

class kombu.transport.virtual.Channel(connection, **kwargs)

Virtual channel.

Parameters:connection – The transport instance this channel is part of.
Message

message class used.

state

Broker state containing exchanges and bindings.

qos

QoS manager for this channel.

do_restore

flag to restore unacked messages when channel goes out of scope.

exchange_types

mapping of exchange types and corresponding classes.

exchange_declare(exchange, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False)

Declare exchange.

exchange_delete(exchange, if_unused=False, nowait=False)

Delete exchange and all its bindings.

queue_declare(queue, passive=False, **kwargs)

Declare queue.

queue_delete(queue, if_unusued=False, if_empty=False, **kwargs)

Delete queue.

queue_bind(queue, exchange, routing_key, arguments=None, **kwargs)

Bind queue to exchange with routing key.

queue_purge(queue, **kwargs)

Remove all ready messages from queue.

basic_publish(message, exchange, routing_key, **kwargs)

Publish message.

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)

Consume from queue

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_get(queue, **kwargs)

Get message by direct access (synchronous).

basic_ack(delivery_tag)

Acknowledge message.

basic_recover(requeue=False)

Recover unacked messages.

basic_reject(delivery_tag, requeue=False)

Reject message.

basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)

Change QoS settings for this channel.

Only prefetch_count is supported.

get_table(exchange)

Get table of bindings for exchange.

typeof(exchange)

Get the exchange type instance for exchange.

drain_events(timeout=None)
prepare_message(message_data, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

Prepare message data.

message_to_python(raw_message)

Convert raw message to Message instance.

flow(active=True)

Enable/disable message flow.

Raises NotImplementedError:
 as flow is not implemented by the base virtual implementation.
close()

Close channel, cancel all consumers, and requeue unacked messages.

Message

class kombu.transport.virtual.Message(channel, payload, **kwargs)
exception MessageStateError

The message has already been acknowledged.

args
message
Message.ack()

Acknowledge this message as being processed., This will remove the message from the queue.

Raises MessageStateError:
 If the message has already been acknowledged/requeued/rejected.
Message.acknowledged

Set to true if the message has been acknowledged.

Message.decode()

Deserialize the message body, returning the original python structure sent by the publisher.

Message.payload

The decoded message body.

Message.reject()

Reject this message.

The message will be discarded by the server.

Raises MessageStateError:
 If the message has already been acknowledged/requeued/rejected.
Message.requeue()

Reject this message and put it back on the queue.

You must not use this method as a means of selecting messages to process.

Raises MessageStateError:
 If the message has already been acknowledged/requeued/rejected.
Message.serializable()

Quality Of Service

class kombu.transport.virtual.QoS(channel, prefetch_count=0)

Quality of Service guarantees.

Only supports prefetch_count at this point.

Parameters:
  • channel – AMQ Channel.
  • prefetch_count – Initial prefetch count (defaults to 0).
ack(delivery_tag)

Acknowledge message and remove from transactional state.

append(message, delivery_tag)

Append message to transactional state.

can_consume()

Returns true if the channel can be consumed from.

Used to ensure the client adhers to currently active prefetch limits.

prefetch_count

current prefetch count value

reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

restore_unacked()

Restore all unacknowledged messages.

restore_unacked_once()

Restores all uncknowledged message at shutdown/gc collect.

Will only be done once for each instance.

In-memory State

class kombu.transport.virtual.BrokerState(exchanges=None, bindings=None)
bindings

active bindings.

exchanges

exchange declarations.

Table Of Contents

Previous topic

kombu.transport.base

Next topic

kombu.transport.virtual.exchange

This Page