@@ -213,18 +213,17 @@ class Consumer {
213213 */
214214 #running = false ;
215215
216- /**
217- * Whether the consumer is in KafkaJS compatibility mode.
218- * @type {boolean }
219- */
220- #kafkaJSCompatibilityMode = false ;
221-
222216 /**
223217 * The message cache for KafkaJS compatibility mode.
224218 * @type {MessageCache|null }
225219 */
226220 #messageCache = null ;
227221
222+ /**
223+ * Whether the user has enabled manual offset management (stores).
224+ */
225+ #userManagedStores = false ;
226+
228227 /**
229228 * @constructor
230229 * @param {import("../../types/kafkajs").ConsumerConfig } kJSConfig
@@ -343,12 +342,13 @@ class Consumer {
343342 try {
344343 if ( err . code === LibrdKafkaError . codes . ERR__ASSIGN_PARTITIONS ) {
345344
346- if ( this . #checkPendingSeeks && this . #kafkaJSCompatibilityMode)
345+ const checkPendingSeeks = this . #pendingSeeks. size !== 0 ;
346+ if ( checkPendingSeeks )
347347 assignment = this . #assignAsPerSeekedOffsets( assignment ) ;
348348
349349 this . #internalClient. assign ( assignment ) ;
350350
351- if ( this . # checkPendingSeeks) {
351+ if ( checkPendingSeeks ) {
352352 const offsetsToCommit = assignment
353353 . filter ( ( topicPartition ) => topicPartition . offset !== undefined )
354354 . map ( ( topicPartition ) => ( {
@@ -491,11 +491,6 @@ class Consumer {
491491 throw new error . KafkaJSError ( CompatibilityErrorMessages . runOptionsAutoCommitThreshold ( ) , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
492492 }
493493
494- /* Offset storage management is manual in kafkaJS compability mode if auto-commit is turned on (ie by default). */
495- if ( rdKafkaConfig [ 'enable.auto.commit' ] ) {
496- rdKafkaConfig [ 'enable.auto.offset.store' ] = false ;
497- }
498-
499494 /* Set the logger */
500495 if ( Object . hasOwn ( kjsConfig , 'logger' ) ) {
501496 this . #logger = kjsConfig . logger ;
@@ -517,8 +512,6 @@ class Consumer {
517512 * log level, as librdkafka will control the granularity. */
518513 if ( ! compatibleConfig || Object . keys ( compatibleConfig ) . length === 0 ) {
519514 this . #logger. setLogLevel ( logLevel . DEBUG ) ;
520- } else {
521- this . #kafkaJSCompatibilityMode = true ;
522515 }
523516
524517 /* Even if we are in compability mode, setting a 'debug' in the main config must override the logger's level. */
@@ -542,6 +535,21 @@ class Consumer {
542535 }
543536 rdKafkaConfig [ 'rebalance_cb' ] = this . #rebalanceCallback. bind ( this ) ;
544537
538+ /* Offset management is different from case to case.
539+ * Case 1: User has changed value of enable.auto.offset.store. In this case, we respect that.
540+ * Case 2: automatic committing is on. In this case, we turn off auto.offset.store and store offsets manually.
541+ * this is necessary for cache invalidation and management, as we want to put things into the store
542+ * after eachMessage is called, and not on consume itself.
543+ * Case 3: automatic committing is off. In this case, we turn off auto.offset.store too. Since the user might
544+ * call an empty commit() and expect things to work properly (ie. the right offsets be stored).
545+ * All this works out a singular, simple condition.
546+ */
547+ if ( ! Object . hasOwn ( this . #userConfig, 'enable.auto.offset.store' ) ) {
548+ rdKafkaConfig [ 'enable.auto.offset.store' ] = false ;
549+ } else {
550+ this . #userManagedStores = ! rdKafkaConfig [ 'enable.auto.offset.store' ] ;
551+ }
552+
545553 return rdKafkaConfig ;
546554 }
547555
@@ -699,10 +707,7 @@ class Consumer {
699707 }
700708
701709 const rdKafkaConfig = this . #config( ) ;
702-
703- if ( this . #kafkaJSCompatibilityMode) {
704- this . #messageCache = new MessageCache ( Math . floor ( rdKafkaConfig [ 'max.poll.interval.ms' ] * 0.8 ) ) ;
705- }
710+ this . #messageCache = new MessageCache ( Math . floor ( rdKafkaConfig [ 'max.poll.interval.ms' ] * 0.8 ) ) ;
706711
707712 this . #state = ConsumerState . CONNECTING ;
708713 this . #internalClient = new RdKafka . KafkaConsumer ( rdKafkaConfig ) ;
@@ -804,10 +809,6 @@ class Consumer {
804809 throw new error . KafkaJSError ( CompatibilityErrorMessages . runOptionsPartitionsConsumedConcurrently ( ) , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
805810 }
806811
807- if ( ! this . #kafkaJSCompatibilityMode) {
808- throw new error . KafkaJSError ( 'run() can only be used in KafkaJS compatibility mode.' , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
809- }
810-
811812 if ( this . #running) {
812813 throw new error . KafkaJSError ( 'Consumer is already running.' , { code : error . ErrorCodes . ERR__STATE } ) ;
813814 }
@@ -829,7 +830,7 @@ class Consumer {
829830 /* Invalidate the message cache if needed. */
830831 if ( this . #messageCache. isStale ( ) ) {
831832 await this . #clearCacheAndResetPositions( true ) ;
832- this . #lock. release ( ) ;
833+ await this . #lock. release ( ) ;
833834 continue ;
834835 }
835836
@@ -841,7 +842,7 @@ class Consumer {
841842 } ) ;
842843
843844 if ( ! m ) {
844- this . #lock. release ( ) ;
845+ await this . #lock. release ( ) ;
845846 continue ;
846847 }
847848
@@ -876,7 +877,7 @@ class Consumer {
876877 * This is especially true since the pattern of pause() followed by throwing an error
877878 * is encouraged. To meet the API contract, we seek one offset backward at this point (which
878879 * means seeking to the message offset). */
879- this . seek ( {
880+ await this . seek ( {
880881 topic : m . topic ,
881882 partition : m . partition ,
882883 offset : m . offset ,
@@ -886,7 +887,7 @@ class Consumer {
886887 /* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
887888 if ( eachMessageProcessed ) {
888889 try {
889- if ( this . #internalConfig [ 'enable.auto.commit' ] ) {
890+ if ( ! this . #userManagedStores ) {
890891 this . #internalClient. offsetsStore ( [ { topic : m . topic , partition : m . partition , offset : Number ( m . offset ) + 1 } ] ) ;
891892 }
892893 this . #lastConsumedOffsets. set ( `${ m . topic } |${ m . partition } ` , Number ( m . offset ) + 1 ) ;
@@ -916,14 +917,12 @@ class Consumer {
916917
917918 /**
918919 * Consumes a single message from the consumer within the given timeout.
920+ * THIS METHOD IS NOT IMPLEMENTED.
919921 * @note This method cannot be used with run(). Either that, or this must be used.
920922 *
921923 * @param {any } args
922924 * @param {number } args.timeout - the timeout in milliseconds, defaults to 1000.
923925 * @returns {import("../..").Message|null } a message, or null if the timeout was reached.
924- *
925- * @note This API is currently in an experimental stage and subject to change.
926- * This should not be used in KafkaJS compatibility mode (ie with kafkaJS blocks in the config).
927926 */
928927 async consume ( { timeout } = { timeout : 1000 } ) {
929928 if ( this . #state !== ConsumerState . CONNECTED ) {
@@ -934,10 +933,6 @@ class Consumer {
934933 throw new error . KafkaJSError ( 'consume() and run() cannot be used together.' , { code : error . ErrorCodes . ERR__CONFLICT } ) ;
935934 }
936935
937- if ( this . #kafkaJSCompatibilityMode) {
938- throw new error . KafkaJSError ( 'consume() cannot be used in KafkaJS compatibility mode.' , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
939- }
940-
941936 this . #internalClient. setDefaultConsumeTimeout ( timeout ) ;
942937 let m = null ;
943938
@@ -948,7 +943,8 @@ class Consumer {
948943 this . #internalClient. setDefaultConsumeTimeout ( undefined ) ;
949944 }
950945
951- return m ?? null ;
946+ throw new error . KafkaJSError ( 'consume() is not implemented.' + m , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
947+ // return m ?? null;
952948 }
953949
954950 async #commitOffsetsUntilNoStateErr( offsetsToCommit ) {
@@ -1043,8 +1039,7 @@ class Consumer {
10431039 /* We need a complete reset of the cache if we're seeking to a different offset even for one partition.
10441040 * At a later point, this may be improved at the cost of added complexity of maintaining message generation,
10451041 * or else purging the cache of just those partitions which are seeked. */
1046- if ( this . #kafkaJSCompatibilityMode)
1047- await this . #clearCacheAndResetPositions( true ) ;
1042+ await this . #clearCacheAndResetPositions( true ) ;
10481043
10491044 /* It's assumed that topicPartition is already assigned, and thus can be seeked to and committed to.
10501045 * Errors are logged to detect bugs in the internal code. */
@@ -1064,7 +1059,7 @@ class Consumer {
10641059 }
10651060
10661061 /* Offsets are committed on seek only when in compatibility mode. */
1067- if ( offsetsToCommit . length !== 0 && this . #internalConfig[ 'enable.auto.commit' ] && this . #kafkaJSCompatibilityMode ) {
1062+ if ( offsetsToCommit . length !== 0 && this . #internalConfig[ 'enable.auto.commit' ] ) {
10681063 await this . #commitOffsetsUntilNoStateErr( offsetsToCommit ) ;
10691064 }
10701065
@@ -1078,7 +1073,7 @@ class Consumer {
10781073 * If at any time, the consumer is assigned the partition, the seek will be performed.
10791074 * Depending on the value of the librdkafka property 'enable.auto.commit', the consumer will commit the offset seeked to.
10801075 * @param {import("../../types/kafkajs").TopicPartitionOffset } topicPartitionOffset
1081- * @returns {Promise<void>|null } a promise that resolves when the seek has been performed (only when not in compatibility mode), or null (when in compatibility mode)
1076+ * @returns {Promise<void>|null } a promise that resolves when the seek has been performed.
10821077 */
10831078 seek ( topicPartitionOffset ) {
10841079 if ( this . #state !== ConsumerState . CONNECTED ) {
@@ -1098,12 +1093,6 @@ class Consumer {
10981093
10991094 this . #checkPendingSeeks = true ;
11001095 this . #pendingSeeks. set ( `${ rdKafkaTopicPartitionOffset . topic } |${ rdKafkaTopicPartitionOffset . partition } ` , rdKafkaTopicPartitionOffset . offset ) ;
1101-
1102- /* Immediately realize the seek if we're not in compatibility mode. And clear pending seeks.
1103- * We don't need them for rebalance. */
1104- if ( ! this . #kafkaJSCompatibilityMode) {
1105- return this . #seekInternal( ) . then ( ( ) => this . #pendingSeeks. clear ( ) ) ;
1106- }
11071096 }
11081097
11091098 async describeGroup ( ) {
@@ -1160,8 +1149,7 @@ class Consumer {
11601149 return ;
11611150 }
11621151 this . #internalClient. pause ( topics ) ;
1163- if ( this . #kafkaJSCompatibilityMode)
1164- this . #messageCache. stale = true ;
1152+ this . #messageCache. stale = true ;
11651153
11661154 topics . map ( JSON . stringify ) . forEach ( topicPartition => this . #pausedPartitions. add ( topicPartition ) ) ;
11671155
0 commit comments