File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed
Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -343,7 +343,7 @@ class Consumer {
343343 try {
344344 if ( err . code === LibrdKafkaError . codes . ERR__ASSIGN_PARTITIONS ) {
345345
346- if ( this . #checkPendingSeeks)
346+ if ( this . #checkPendingSeeks && this . #kafkaJSCompatibilityMode )
347347 assignment = this . #assignAsPerSeekedOffsets( assignment ) ;
348348
349349 this . #internalClient. assign ( assignment ) ;
@@ -1099,9 +1099,10 @@ class Consumer {
10991099 this . #checkPendingSeeks = true ;
11001100 this . #pendingSeeks. set ( `${ rdKafkaTopicPartitionOffset . topic } |${ rdKafkaTopicPartitionOffset . partition } ` , rdKafkaTopicPartitionOffset . offset ) ;
11011101
1102- /* Immediately realize the seek if we're not in compatibility mode. */
1102+ /* Immediately realize the seek if we're not in compatibility mode. And clear pending seeks.
1103+ * We don't need them for rebalance. */
11031104 if ( ! this . #kafkaJSCompatibilityMode) {
1104- return this . #seekInternal( ) ;
1105+ return this . #seekInternal( ) . then ( ( ) => this . #pendingSeeks . clear ( ) ) ;
11051106 }
11061107 }
11071108
You can’t perform that action at this time.
0 commit comments