2121import static com .rabbitmq .stream .impl .Utils .namedRunnable ;
2222import static com .rabbitmq .stream .impl .Utils .quote ;
2323
24- import com .rabbitmq .stream .BackOffDelayPolicy ;
25- import com .rabbitmq .stream .Constants ;
24+ import com .rabbitmq .stream .*;
2625import com .rabbitmq .stream .Consumer ;
27- import com .rabbitmq .stream .MessageHandler ;
2826import com .rabbitmq .stream .MessageHandler .Context ;
29- import com .rabbitmq .stream .OffsetSpecification ;
30- import com .rabbitmq .stream .StreamDoesNotExistException ;
31- import com .rabbitmq .stream .StreamException ;
32- import com .rabbitmq .stream .StreamNotAvailableException ;
33- import com .rabbitmq .stream .SubscriptionListener ;
3427import com .rabbitmq .stream .SubscriptionListener .SubscriptionContext ;
3528import com .rabbitmq .stream .impl .Client .Broker ;
3629import com .rabbitmq .stream .impl .Client .ChunkListener ;
6154import java .util .concurrent .atomic .AtomicBoolean ;
6255import java .util .concurrent .atomic .AtomicLong ;
6356import java .util .concurrent .atomic .AtomicReference ;
64- import java .util .function .Function ;
65- import java .util .function .Predicate ;
57+ import java .util .function .*;
6658import java .util .stream .Collectors ;
6759import java .util .stream .IntStream ;
6860import org .slf4j .Logger ;
@@ -122,8 +114,7 @@ Runnable subscribe(
122114 Runnable trackingClosingCallback ,
123115 MessageHandler messageHandler ,
124116 Map <String , String > subscriptionProperties ,
125- int initialCredits ,
126- int additionalCredits ) {
117+ ConsumerFlowStrategy flowStrategy ) {
127118 List <Client .Broker > candidates = findBrokersForStream (stream );
128119 Client .Broker newNode = pickBroker (candidates );
129120 if (newNode == null ) {
@@ -143,8 +134,7 @@ Runnable subscribe(
143134 trackingClosingCallback ,
144135 messageHandler ,
145136 subscriptionProperties ,
146- initialCredits ,
147- additionalCredits );
137+ flowStrategy );
148138
149139 try {
150140 addToManager (newNode , subscriptionTracker , offsetSpecification , true );
@@ -403,8 +393,7 @@ private static class SubscriptionTracker {
403393 private volatile ClientSubscriptionsManager manager ;
404394 private volatile AtomicReference <SubscriptionState > state =
405395 new AtomicReference <>(SubscriptionState .OPENING );
406- private final int initialCredits ;
407- private final int additionalCredits ;
396+ private final ConsumerFlowStrategy flowStrategy ;
408397
409398 private SubscriptionTracker (
410399 long id ,
@@ -416,8 +405,7 @@ private SubscriptionTracker(
416405 Runnable trackingClosingCallback ,
417406 MessageHandler messageHandler ,
418407 Map <String , String > subscriptionProperties ,
419- int initialCredits ,
420- int additionalCredits ) {
408+ ConsumerFlowStrategy flowStrategy ) {
421409 this .id = id ;
422410 this .consumer = consumer ;
423411 this .stream = stream ;
@@ -426,8 +414,7 @@ private SubscriptionTracker(
426414 this .subscriptionListener = subscriptionListener ;
427415 this .trackingClosingCallback = trackingClosingCallback ;
428416 this .messageHandler = messageHandler ;
429- this .initialCredits = initialCredits ;
430- this .additionalCredits = additionalCredits ;
417+ this .flowStrategy = flowStrategy ;
431418 if (this .offsetTrackingReference == null ) {
432419 this .subscriptionProperties = subscriptionProperties ;
433420 } else {
@@ -497,13 +484,19 @@ private static final class MessageHandlerContext implements Context {
497484 private final long timestamp ;
498485 private final long committedOffset ;
499486 private final StreamConsumer consumer ;
487+ private final LongConsumer processedCallback ;
500488
501489 private MessageHandlerContext (
502- long offset , long timestamp , long committedOffset , StreamConsumer consumer ) {
490+ long offset ,
491+ long timestamp ,
492+ long committedOffset ,
493+ StreamConsumer consumer ,
494+ LongConsumer processedCallback ) {
503495 this .offset = offset ;
504496 this .timestamp = timestamp ;
505497 this .committedOffset = committedOffset ;
506498 this .consumer = consumer ;
499+ this .processedCallback = processedCallback ;
507500 }
508501
509502 @ Override
@@ -534,6 +527,11 @@ public String stream() {
534527 public Consumer consumer () {
535528 return this .consumer ;
536529 }
530+
531+ @ Override
532+ public void processed () {
533+ this .processedCallback .accept (this .offset );
534+ }
537535 }
538536
539537 /**
@@ -569,13 +567,28 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
569567 (client , subscriptionId , offset , messageCount , dataSize ) -> {
570568 SubscriptionTracker subscriptionTracker =
571569 subscriptionTrackers .get (subscriptionId & 0xFF );
570+ LongConsumer processCallback ;
572571 if (subscriptionTracker != null && subscriptionTracker .consumer .isOpen ()) {
573- client .credit (subscriptionId , subscriptionTracker .additionalCredits );
572+ ConsumerFlowStrategy .Context chunkContext =
573+ new ConsumerFlowStrategy .Context () {
574+ @ Override
575+ public void credits (int credits ) {
576+ client .credit (subscriptionId , 1 );
577+ }
578+
579+ @ Override
580+ public long messageCount () {
581+ return messageCount ;
582+ }
583+ };
584+ processCallback = subscriptionTracker .flowStrategy .start (chunkContext );
574585 } else {
575586 LOGGER .debug (
576587 "Could not find stream subscription {} or subscription closing, not providing credits" ,
577588 subscriptionId & 0xFF );
589+ processCallback = null ;
578590 }
591+ return processCallback ;
579592 };
580593
581594 CreditNotification creditNotification =
@@ -591,17 +604,20 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
591604 };
592605
593606 MessageListener messageListener =
594- (subscriptionId , offset , chunkTimestamp , committedOffset , message ) -> {
607+ (subscriptionId , offset , chunkTimestamp , committedOffset , chunkContext , message ) -> {
595608 SubscriptionTracker subscriptionTracker =
596609 subscriptionTrackers .get (subscriptionId & 0xFF );
597610 if (subscriptionTracker != null ) {
598611 subscriptionTracker .offset = offset ;
599612 subscriptionTracker .hasReceivedSomething = true ;
600613 subscriptionTracker .messageHandler .handle (
601614 new MessageHandlerContext (
602- offset , chunkTimestamp , committedOffset , subscriptionTracker .consumer ),
615+ offset ,
616+ chunkTimestamp ,
617+ committedOffset ,
618+ subscriptionTracker .consumer ,
619+ (LongConsumer ) chunkContext ),
603620 message );
604- // FIXME set offset here as well, best effort to avoid duplicates?
605621 } else {
606622 LOGGER .debug (
607623 "Could not find stream subscription {} in manager {}, node {}" ,
@@ -969,7 +985,7 @@ synchronized void add(
969985 subId ,
970986 subscriptionTracker .stream ,
971987 subscriptionContext .offsetSpecification (),
972- subscriptionTracker .initialCredits ,
988+ subscriptionTracker .flowStrategy . initialCredits () ,
973989 subscriptionTracker .subscriptionProperties ),
974990 RETRY_ON_TIMEOUT ,
975991 "Subscribe request for consumer %s on stream '%s'" ,
0 commit comments