Federated Queues

Overview

This guide covers federated queues, a subset of functionality offered by the Federation plugin.

Some covered topics include:

A separate Federation plugin reference guide is available.

In addition to federated exchanges, RabbitMQ supports federated queues. This feature provides a way of balancing the load of a single logical queue across nodes or clusters. It does so by moving messages to other federation peers (nodes or clusters) when the local queue has no consumers.

While federated exchanges replicate their stream of messages from the upstream to one or more downstreams, federated queues move data where the consumers are, always preferring local consumers to remote ones. Federated queues are considered to be equal peers, there is no “leader/follower” relationship between them like with federated exchanges.

A federated queue links to other of its federated peers (called upstream queues). It will retrieve messages from upstream queues in order to satisfy demand for messages from local consumers. The upstream queues do not need to be reconfigured. They are assumed to be located on a separate node or in a separate cluster.

An upstream definition is a URI with certain recognised query parameters that control link connection parameters. Upstreams can be managed using CLI tools or the HTTP API with an additional plugin.

The following diagram demonstrates several federated and unfederated queues in two RabbitMQ nodes connected using queue federation:

Overview of federated queues

When queue federation is used, usually only a subset of queues in a cluster is federated. Some queues can be inherently local to the “site” (cluster) and its uses.

Use Cases

The typical use would be to have the same “logical” queue distributed over many brokers. Each broker would declare a federated queue with all the other federated queues upstream. The links would form a complete bi-directional graph on the federated peers (nodes or clusters).

Such a logical distributed queue is capable of having rather higher capacity than a single queue. It will perform best when there is some degree of locality; i.e. as many messages as possible are consumed from the same queue as they were published to, and the federation mechanism only needs to move messages around in order to perform load balancing.

Limitations

Federated queues include a number of limitations or differences compared to their non-federated peers as well as federated exchanges.

Queue federation will not propagate bindings from the downstream to the upstreams.

Applications that use basic.get (consume via polling, a highly discouraged practice) cannot retrieve messages over federation if there aren’t any in a local queue (on the node the client is connected to). Since basic.get is a synchronous method, the node serving a request would have to block while contacting all the other nodes to retrieve more messages. This wouldn’t sit well with federation’s availability and partition tolerance-oriented design and use cases.

Brokers running different versions of RabbitMQ can be connected using federation. However, since queue federation requires consumer priorities, it is not possible to federate a queue with a broker running a RabbitMQ version prior to 3.2.0.

Usage and Configuration

Federated queues are declared just like any other queue, by applications. In order for RabbitMQ to recognize that a queue needs to be federated, and what other nodes messages should be consumed from, downstream (consuming) nodes need to be configured.

Federated queue policies

Federation configuration uses runtime parameters and policies, which means it can be configured and reconfigured on the fly as system topology changes. There are two key pieces of configuration involved:

  • Upstreams: these are remote endpoints in a federated system
  • Federation policies: these control what queues are federated and what upstreams (peers) they will connect to

Both of those are configured on the upstream nodes or clusters.

To add an upstream, use the rabbitmqctl set_parameter command. It accepts three parameters:

  • Parameter type, federation-upstream
  • An upstream name that federation policies will refer to
  • An upstream definition JSON document with at least one mandatory key, uri

The following example configures an upstream named “source” which can be contacted at remote-host.local:5672:

# Adds a federation upstream named "origin"
rabbitmqctl set_parameter federation-upstream origin '{"uri":"amqp://localhost:5673"}'

On Windows, use rabbitmqctl.bat and suitable PowerShell quoting:

# Adds a federation upstream named "origin"
rabbitmqctl.bat set_parameter federation-upstream origin "{""uri"":""amqp://localhost:5673""}"

Once an upstream has been specified, a policy that controls federation can be added. It is added just like any other policy, using rabbitmqctl set_policy:

# Adds a policy named "queue-federation"
rabbitmqctl set_policy queue-federation \
    "^federated\." \
    '{"federation-upstream-set":"all"}' \
    --priority 10 \
    --apply-to queues

Here’s a Windows version of the above example:

# Adds a policy named "queue-federation"
rabbitmqctl.bat set_policy queue-federation ^
    "^federated\." ^
    "{""federation-upstream-set"":""all""}" ^
    --priority 10 ^
    --apply-to queues

In the example above, the policy will match queues whose name begins with a federated. prefix in the default virtual host. Those queues will set up federation links for all declared upstreams. The name of the policy is queue-federation. As with any policy, if multiple policies match a queue, the one with the highest priority will be used. Multiple policy definitions will not be combined, even if their priorities are equal.

Once configured, a federation link (connection) will be opened for every matching queue and upstream pair. By “matching queue” here we mean a queue that is matched by the federation policy pattern. If no queues matched, no links will be started.

To disable federation for the matching queues, delete the policy using its name:

rabbitmqctl clear_policy queue-federation

Complex Topologies and Loop Handling

A federated queue can be “upstream” from another federated queue. One can even form “loops”, for example, queue A declares queue B to be upstream from it, and queue B declares queue A to be upstream from it. More complex multiple-connected arrangements are allowed. Such complex topologies will be increasingly difficult to reason about and troubleshoot, however.

Unlike federated exchanges, queue federation does not replicate data and does not handle loops explicitly. There is no limit to how many times a message can be forwarded between federated queues.

In a set of mutually-federated queues, messages will move to where the spare consuming capacity is so if the spare consuming capacity keeps moving around then so will the messages. Since messages are moved to remote nodes only when there are no local consumers, it is rare for a message to be moved across all nodes and “wrap around”.

Implementation

The federated queue will connect to all its upstream queues using AMQP 0-9-1 (optionally secured with TLS).

The federated queue will only retrieve messages when it has run out of messages locally, it has consumers that need messages, and the upstream queue has “spare” messages that are not being consumed. The intent is to ensure that messages are only transferred between federated queues when needed. This is implemented using consumer priorities.

If messages are forwarded from one queue to another then message ordering is only preserved for messages which have made exactly the same journey between nodes. In some cases messages which were adjacent when published may take different routes to the same node to be consumed; therefore messages can be reordered in the presence of queue federation.

Each individual queue applies its arguments separately; for example if you set x-max-length on a federated queue then that queue will have its length limited (possibly discarding messages when it gets full) but other queues that are federated with it will not be affected. Note in particular that when per-queue or per-message TTL is in use, a message will have its timer reset when it is transferred to another queue.

Pitfalls

Federated queues cannot currently cause messages to traverse multiple hops between brokers based solely on need for messages in one place. For example, if you federate queues on nodes A, B and C, with A and B connected and B and C connected, but not A and C, then if messages are available at A and consumers waiting at C then messages will not be transferred from A to C via B unless there is also a consumer at B.

It is possible to bind a federated queue to a federated exchange. However, the results may be unexpected to some. Since a federated exchange will retrieve messages from its upstream that match its bindings, any message published to a federated exchange will be copied to any nodes that matching bindings. A federated queue will then move these messages around between nodes, and it is therefore possible to end up with multiple copies of the same message on the same node.