File tree Expand file tree Collapse file tree 1 file changed +31
-0
lines changed
src/test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 1 file changed +31
-0
lines changed Original file line number Diff line number Diff line change 1515
1616import static com .rabbitmq .stream .ConsumerFlowStrategy .creditWhenHalfMessagesProcessed ;
1717import static com .rabbitmq .stream .impl .TestUtils .*;
18+ import static com .rabbitmq .stream .impl .TestUtils .CountDownLatchConditions .completed ;
19+ import static java .lang .Runtime .getRuntime ;
1820import static java .lang .String .format ;
1921import static java .util .Collections .synchronizedList ;
2022import static org .assertj .core .api .Assertions .*;
@@ -236,6 +238,35 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
236238 consumer .close ();
237239 }
238240
241+ @ Test
242+ void asynchronousProcessingWithFlowControl () {
243+ int messageCount = 100_000 ;
244+ TestUtils .publishAndWaitForConfirms (cf , messageCount , stream );
245+
246+ ExecutorService executorService =
247+ Executors .newFixedThreadPool (getRuntime ().availableProcessors ());
248+ try {
249+ CountDownLatch latch = new CountDownLatch (messageCount );
250+ environment .consumerBuilder ().stream (stream )
251+ .offset (OffsetSpecification .first ())
252+ .flow ()
253+ .strategy (creditWhenHalfMessagesProcessed ())
254+ .builder ()
255+ .messageHandler (
256+ (ctx , message ) -> {
257+ executorService .submit (
258+ () -> {
259+ latch .countDown ();
260+ ctx .processed ();
261+ });
262+ })
263+ .build ();
264+ assertThat (latch ).is (completed ());
265+ } finally {
266+ executorService .shutdownNow ();
267+ }
268+ }
269+
239270 @ Test
240271 void closeOnCondition () throws Exception {
241272 int messageCount = 50_000 ;
You can’t perform that action at this time.
0 commit comments