Stream Plugin NOSYNTAX
Streams are a new persistent and replicated data structure in RabbitMQ 3.9 which models an append-only log with non-destructive consumer semantics. They can be used as a regular AMQP 0.9.1 queue or through a dedicated binary protocol plugin and associated client(s).
The Stream plugin is included in the RabbitMQ distribution. Before clients can successfully connect, it must be enabled using rabbitmq-plugins:
rabbitmq-plugins enable rabbitmq_stream
When no configuration is specified the Stream Adapter will listen on
all interfaces on port 5552 and have a default user login/passcode
To change this, edit your
to contain a
tcp_listeners variable for the
For example, a minimalistic configuration file which changes the listener port to 12345 would look like:
stream.listeners.tcp.1 = 12345
while one which changes the listener to listen only on localhost (for both IPv4 and IPv6) would look like:
stream.listeners.tcp.1 = 127.0.0.1:5552 stream.listeners.tcp.2 = ::1:5552
The plugin supports TCP listener option configuration.
The settings use a common prefix,
stream.tcp_listen_options, and control
things such as TCP buffer sizes, inbound TCP connection queue length, whether TCP keepalives
are enabled and so on. See the Networking guide for details.
stream.listeners.tcp.1 = 127.0.0.1:5552 stream.listeners.tcp.2 = ::1:5552 stream.tcp_listen_options.backlog = 4096 stream.tcp_listen_options.recbuf = 131072 stream.tcp_listen_options.sndbuf = 131072 stream.tcp_listen_options.keepalive = true stream.tcp_listen_options.nodelay = true stream.tcp_listen_options.exit_on_close = true stream.tcp_listen_options.send_timeout = 120
It is possible to set the maximum size of frames (default is 1 MiB) and the heartbeat (default is 60 seconds), if needed:
stream.frame_max = 2097152 # in bytes stream.heartbeat = 120 # in seconds
Fast publishers can overwhelm the broker if it cannot keep up writing and replicating inbound messages.
So each connection has a maximum number of outstanding unconfirmed messages allowed before being blocked
initial_credits, defaults to 50,000). The connection is unblocked when a given number of messages
is confirmed (
credits_required_for_unblocking, defaults to 12,500). You can change those values
according to your workload:
stream.initial_credits = 100000 stream.credits_required_for_unblocking = 25000
High values for these settings can improve publishing throughput at the cost of higher memory consumption (which can finally make the broker crash). Low values can help to cope with a lot of moderately fast-publishing connections.
The stream protocol allows to discover the topology of streams, that is where the leader and replicas for a given set of streams are located in the cluster. This way the client can choose to connect to the appropriate node to interact with the streams: the leader node to publish, a replica to consume. By default, nodes return their hostname and listener port, which may be fine for most situations, but not always (proxy sitting between the cluster nodes and the clients, cluster nodes and/or clients running in containers, etc).
advertised_port keys allow to specify which information a broker node returns when asked
the topology of streams. One can set those settings according to their infrastructure, so that clients can connect
to cluster nodes:
stream.advertised_host = rabbitmq-1 stream.advertised_port = 12345
To use TLS for stream connections, TLS must be configured in the broker. To enable
TLS-enabled stream connections, add a TLS listener for streams using the
stream.listeners.ssl.* configuration keys.
The plugin will use core RabbitMQ server certificates and key (just like AMQP 0-9-1 and AMQP 1.0 listeners do):
ssl_options.cacertfile = /path/to/tls/ca_certificate.pem ssl_options.certfile = /path/to/tls/server_certificate.pem ssl_options.keyfile = /path/to/tls/server_key.pem ssl_options.verify = verify_peer ssl_options.fail_if_no_peer_cert = true stream.listeners.tcp.1 = 5552 # default TLS-enabled port for stream connections stream.listeners.ssl.1 = 5551
This configuration creates a standard TCP listener on port 5552 and a TLS listener on port 5551.
When a TLS listener is set up it may be desired to disable all non-TLS ones. This can be configured like so:
stream.listeners.tcp = none stream.listeners.ssl.1 = 5551
Just like for plain connections, it is possible to configure
an advertised TLS port. The plugin will use the current TLS port for TLS connections,
but you can override this behavior with the
stream.advertised_host = rabbitmq-1 stream.advertised_port = 12345 stream.advertised_tls_port = 12344