You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
WARNING: Filtering requires *RabbitMQ 3.13* or more.
8
8
@@ -21,7 +21,7 @@ Because the server-side filtering is probabilistic: messages that do not match t
21
21
The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible.
22
22
Despite this, the filtering saves some bandwidth, which is its primary goal.
23
23
24
-
===== Filtering on the Publishing Side
24
+
==== Filtering on the Publishing Side
25
25
26
26
Filtering on the publishing side consists in defining some logic to extract the filter value from a message.
27
27
The following snippet shows how to extract the filter value from an application property:
This section describes the API to connect to the RabbitMQ Stream Plugin, publish messages, and
9
9
consume messages. There are 3 main interfaces:
@@ -13,9 +13,9 @@ managing streams.
13
13
* `com.rabbitmq.stream.Producer` to publish messages.
14
14
* `com.rabbitmq.stream.Consumer` to consume messages.
15
15
16
-
==== Environment
16
+
=== Environment
17
17
18
-
===== Creating the Environment
18
+
==== Creating the Environment
19
19
20
20
The environment is the main entry point to a node or a cluster of nodes. `Producer` and
21
21
`Consumer` instances are created from an `Environment` instance. Here is the simplest
@@ -69,7 +69,7 @@ By specifying several URIs, the environment will try to connect to the first one
69
69
will pick a new URI randomly in case of disconnection.
70
70
71
71
[[understanding-connection-logic]]
72
-
===== Understanding Connection Logic
72
+
==== Understanding Connection Logic
73
73
74
74
Creating the environment to connect to a cluster node works usually seamlessly.
75
75
Creating publishers and consumers can cause problems as the client uses hints from the cluster to find the nodes where stream leaders and replicas are located to connect to the appropriate nodes.
@@ -82,7 +82,7 @@ This happens if the following conditions are met: the initial host to connect to
82
82
Provide a pass-through `AddressResolver` to `EnvironmentBuilder#addressResolver(AddressResolver)` to avoid this behavior.
83
83
It is unlikely this behavior applies for any real-world deployment, where `localhost` and/or the default `guest` user should not be used.
84
84
85
-
===== Enabling TLS
85
+
==== Enabling TLS
86
86
87
87
TLS can be enabled by using the `rabbitmq-stream+tls` scheme in the URI.
The following table sums up the main settings to create an `Environment`:
125
125
@@ -266,7 +266,7 @@ It is the developer's responsibility to close the `EventLoopGroup` they provide.
266
266
267
267
|===
268
268
269
-
===== When a Load Balancer is in Use
269
+
==== When a Load Balancer is in Use
270
270
271
271
A load balancer can misguide the client when it tries to connect to nodes that host stream leaders and replicas.
272
272
The https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/["Connecting to Streams"] blog post covers why client applications must connect to the appropriate nodes in a cluster and how a https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer[load balancer can make things complicated] for them.
The blog post covers the https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround].
287
287
288
-
===== Managing Streams
288
+
==== Managing Streams
289
289
290
290
Streams are usually long-lived, centrally-managed entities, that is, applications
291
291
are not supposed to create and delete them. It is nevertheless possible to create and
RabbitMQ Stream provides a special mode to publish, store, and dispatch messages: sub-entry batching.
702
702
This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled.
@@ -770,11 +770,11 @@ The broker dispatches messages to client libraries: they are supposed to figure
770
770
So when you set up sub-entry batching and compression in your publishers, the consuming applications must use client libraries that support this mode, which is the case for the stream Java client.
771
771
====
772
772
773
-
==== Consumer
773
+
=== Consumer
774
774
775
775
`Consumer` is the API to consume messages from a stream.
776
776
777
-
===== Creating a Consumer
777
+
==== Creating a Consumer
778
778
779
779
A `Consumer` instance is created with `Environment#consumerBuilder()`. The main
780
780
settings are the stream to consume from, the place in the stream to start
@@ -885,7 +885,7 @@ types of offset specification.
885
885
====
886
886
887
887
[[specifying-an-offset]]
888
-
===== Specifying an Offset
888
+
==== Specifying an Offset
889
889
890
890
The offset is the place in the stream where the consumer starts consuming from.
891
891
The possible values for the offset parameter are the following:
@@ -939,7 +939,7 @@ This is this timestamp the broker uses to find the appropriate chunk to start fr
939
939
The broker chooses the closest chunk _before_ the specified timestamp, that is why consumers may see messages published a bit before what they specified.
This means a consumer can track the offset it has reached in a stream.
@@ -959,7 +959,7 @@ offsets are stored depends on the tracking strategy: automatic or manual.
959
959
Whatever tracking strategy you use, *a consumer must have a name to be able to store offsets*.
960
960
961
961
[[consumer-automatic-offset-tracking]]
962
-
====== Automatic Offset Tracking
962
+
===== Automatic Offset Tracking
963
963
964
964
The following snippet shows how to enable automatic tracking with the defaults:
965
965
@@ -1008,7 +1008,7 @@ possible to have more fine-grained control over offset tracking by using
1008
1008
manual tracking.
1009
1009
1010
1010
[[consumer-manual-offset-tracking]]
1011
-
====== Manual Offset Tracking
1011
+
===== Manual Offset Tracking
1012
1012
1013
1013
The manual tracking strategy lets the developer in charge of storing offsets
1014
1014
whenever they want, not only after a given number of messages has been received
@@ -1047,7 +1047,7 @@ offset of the current message, but it is possible to store anywhere
1047
1047
in the stream with `MessageHandler.Context#consumer()#store(long)` or
1048
1048
simply `Consumer#store(long)`.
1049
1049
1050
-
====== Considerations On Offset Tracking
1050
+
===== Considerations On Offset Tracking
1051
1051
1052
1052
_When to store offsets?_ Avoid storing offsets too often or, worse, for each message.
1053
1053
Even though offset tracking is a small and fast operation, it will
@@ -1076,7 +1076,7 @@ a modulo to perform an operation every X messages. As the message offsets have
1076
1076
no guarantee to be contiguous, the operation may not happen exactly every X messages.
1077
1077
1078
1078
[[consumer-subscription-listener]]
1079
-
====== Subscription Listener
1079
+
===== Subscription Listener
1080
1080
1081
1081
The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
1082
1082
This callback can be used to customize the offset the client library computed for the subscription.
@@ -1118,7 +1118,7 @@ When a glitch happens and triggers the re-subscription, the server-side stored o
1118
1118
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
1119
1119
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
1120
1120
1121
-
===== Flow Control
1121
+
==== Flow Control
1122
1122
1123
1123
This section covers how a consumer can tell the broker when to send more messages.
1124
1124
@@ -1161,7 +1161,7 @@ Whether the method is idempotent depends on the flow strategy implementation.
1161
1161
Apart from the default one, the implementations the library provides does not make `processed()` idempotent.
1162
1162
1163
1163
[[single-active-consumer]]
1164
-
===== Single Active Consumer
1164
+
==== Single Active Consumer
1165
1165
1166
1166
WARNING: Single Active Consumer requires *RabbitMQ 3.11* or more.
1167
1167
@@ -1232,7 +1232,7 @@ We end up with 3 `app-1` consumers and 3 `app-2` consumers, 1 active consumer in
1232
1232
1233
1233
Let's see now the API for single active consumer.
1234
1234
1235
-
====== Enabling Single Active Consumer
1235
+
===== Enabling Single Active Consumer
1236
1236
1237
1237
Use the `ConsumerBuilder#singleActiveConsumer()` method to enable the feature:
With the configuration above, the consumer will take part in the `application-1` group on the `my-stream` stream.
1248
1248
If the consumer instance is the first in a group, it will get messages as soon as there are some available. If it is not the first in the group, it will remain idle until it is its turn to be active (likely when all the instances registered before it are gone).
1249
1249
1250
-
====== Offset Tracking
1250
+
===== Offset Tracking
1251
1251
1252
1252
Single active consumer and offset tracking work together: when the active consumer goes away, another consumer takes over and resumes when the former active left off.
1253
1253
Well, this is how things should work and luckily this is what happens when using <<consumer-offset-tracking, server-side offset tracking>>.
@@ -1257,7 +1257,7 @@ The story is different is you are using an external store for offset tracking.
1257
1257
In this case you need to tell the client library where to resume from and you can do this by implementing the `ConsumerUpdateListener` API.
1258
1258
1259
1259
[[consumer-update-listener]]
1260
-
====== Reacting to Consumer State Change
1260
+
===== Reacting to Consumer State Change
1261
1261
1262
1262
The broker notifies a consumer that becomes active before dispatching messages to it.
1263
1263
The broker expects a response from the consumer and this response contains the offset the dispatching should start from.
Copy file name to clipboardExpand all lines: src/docs/asciidoc/overview.adoc
+3Lines changed: 3 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -97,3 +97,6 @@ The client contains 2 sets of programming interfaces whose stability are of inte
97
97
* Application Programming Interfaces (API): those are the ones used to write application logic. They include the interfaces and classes in the `com.rabbitmq.stream` package (e.g. `Producer`, `Consumer`, `Message`). These API constitute the main programming model of the client and will be kept as stable as possible.
98
98
* Service Provider Interfaces (SPI): those are interfaces to implement mainly technical behavior in the client. They are not meant to be used to implement application logic. Application developers may have to refer to them in the configuration phase and if they want to custom some internal behavior in the client. SPI include interfaces and classes in the `com.rabbitmq.stream.codec`, `com.rabbitmq.stream.compression`, `com.rabbitmq.stream.metrics` packages, among others. _These SPI are susceptible to change, but this should not impact the majority of applications_, as the changes would typically stay intern to the client.
99
99
100
+
== Pre-requisites
101
+
102
+
The library requires Java 8 or later. Java 11 is recommended (CRC calculation uses methods available as of Java 9.)
0 commit comments