1+ const LibrdKafkaError = require ( '../error' ) ;
12const RdKafka = require ( '../rdkafka' ) ;
23const { kafkaJSToRdKafkaConfig } = require ( './_common' ) ;
34
@@ -29,7 +30,7 @@ class Consumer {
2930 async #finalizedConfig( ) {
3031 const config = await kafkaJSToRdKafkaConfig ( this . #kJSConfig) ;
3132 if ( this . #kJSConfig. groupId != null ) {
32- config [ " group.id" ] = this . #kJSConfig. groupId ;
33+ config [ ' group.id' ] = this . #kJSConfig. groupId ;
3334 }
3435 config [ 'offset_commit_cb' ] = true ;
3536 return config ;
@@ -43,20 +44,20 @@ class Consumer {
4344 this . #state = ConsumerState . CONNECTED ;
4445
4546 // Resolve the promise.
46- this . #connectPromiseFunc[ " resolve" ] ( ) ;
47+ this . #connectPromiseFunc[ ' resolve' ] ( ) ;
4748 }
4849
4950 #errorCb( args ) {
5051 console . log ( 'error' , args ) ;
5152 if ( this . #state === ConsumerState . CONNECTING ) {
52- this . #connectPromiseFunc[ " reject" ] ( args ) ;
53+ this . #connectPromiseFunc[ ' reject' ] ( args ) ;
5354 } else {
5455 // do nothing for now.
5556 }
5657 }
5758
5859 #notImplemented( ) {
59- throw new Error ( " Not implemented" ) ;
60+ throw new Error ( ' Not implemented' ) ;
6061 }
6162
6263 #createPayload( message ) {
@@ -66,7 +67,7 @@ class Consumer {
6667 }
6768
6869 let timestamp = message . timestamp ? new Date ( message . timestamp ) . toISOString ( )
69- : "" ;
70+ : '' ;
7071
7172 var headers = undefined ;
7273 if ( message . headers ) {
@@ -113,9 +114,17 @@ class Consumer {
113114 } ) ;
114115 }
115116
117+ #topicPartitionOffsetToRdKafka( tpo ) {
118+ return {
119+ topic : tpo . topic ,
120+ partition : tpo . partition ,
121+ offset : Number ( tpo . offset ) ,
122+ }
123+ }
124+
116125 async connect ( ) {
117126 if ( this . #state !== ConsumerState . INIT ) {
118- return Promise . reject ( " Connect has already been called elsewhere." ) ;
127+ return Promise . reject ( ' Connect has already been called elsewhere.' ) ;
119128 }
120129
121130 this . #state = ConsumerState . CONNECTING ;
@@ -126,9 +135,9 @@ class Consumer {
126135
127136 return new Promise ( ( resolve , reject ) => {
128137 this . #connectPromiseFunc = { resolve, reject} ;
129- console . log ( " Connecting...." ) ;
138+ console . log ( ' Connecting....' ) ;
130139 this . #internalClient. connect ( ) ;
131- console . log ( " connect() called" ) ;
140+ console . log ( ' connect() called' ) ;
132141 } ) ;
133142 }
134143
@@ -142,7 +151,7 @@ class Consumer {
142151
143152 async run ( config ) {
144153 if ( this . #state !== ConsumerState . CONNECTED ) {
145- throw new Error ( " Run must be called in state CONNECTED." ) ;
154+ throw new Error ( ' Run must be called in state CONNECTED.' ) ;
146155 }
147156
148157 while ( this . #state === ConsumerState . CONNECTED ) {
@@ -156,21 +165,34 @@ class Consumer {
156165 }
157166
158167 commitOffsets ( topicPartitions = null ) {
159- if ( topicPartitions == null ) {
160- this . #internalClient. commitSync ( ) ;
161- } else {
162- const topicPartitions = topicPartitions . map ( ( tpo ) => ( {
163- topic : tpo . topic ,
164- partition : tpo . partition ,
165- offset : Number ( tpo . offset ) ,
166- } ) )
167- this . #internalClient. commitSync ( topicPartitions ) ;
168+ try {
169+ if ( topicPartitions == null ) {
170+ this . #internalClient. commitSync ( ) ;
171+ } else {
172+ const topicPartitions = topicPartitions . map (
173+ this . #topicPartitionOffsetToRdKafka) ;
174+ this . #internalClient. commitSync ( topicPartitions ) ;
175+ }
176+ } catch ( e ) {
177+ if ( ! e . code || e . code != LibrdKafkaError . codes . ERR__NO_OFFSET ) {
178+ throw e ;
179+ }
168180 }
169181 return Promise . resolve ( )
170182 }
171183
172184 seek ( topicPartitionOffset ) {
173- this . #notImplemented( ) ;
185+ return new Promise ( ( resolve , reject ) => {
186+ const rdKafkaTopicPartitionOffset =
187+ this . #topicPartitionOffsetToRdKafka( topicPartitionOffset ) ;
188+ this . #internalClient. seek ( rdKafkaTopicPartitionOffset , 0 , ( err ) => {
189+ if ( err ) {
190+ reject ( new Error ( `Seek error code ${ err . code } ` ) ) ;
191+ } else {
192+ resolve ( ) ;
193+ }
194+ } ) ;
195+ } ) . catch ( console . error ) ; // Default handler
174196 }
175197
176198 async describeGroup ( ) {
0 commit comments