File tree Expand file tree Collapse file tree 2 files changed +29
-6
lines changed
Expand file tree Collapse file tree 2 files changed +29
-6
lines changed Original file line number Diff line number Diff line change @@ -7,8 +7,9 @@ async function consumerStart() {
77 ssl : true ,
88 sasl : {
99 mechanism : 'plain' ,
10- username : '<fill>' ,
11- password : '<fill>' ,
10+ } ,
11+ rdKafka : {
12+ 'enable.auto.commit' : false
1213 }
1314 } ) ;
1415
@@ -23,6 +24,7 @@ async function consumerStart() {
2324 ]
2425 } )
2526
27+ var batch = 0 ;
2628 consumer . run ( {
2729 eachMessage : async ( { topic, partition, message } ) => {
2830 console . log ( {
@@ -32,13 +34,21 @@ async function consumerStart() {
3234 key : message . key ?. toString ( ) ,
3335 value : message . value . toString ( ) ,
3436 } )
37+
38+ if ( ++ batch % 100 == 0 ) {
39+ await consumer . commitOffsets ( ) ;
40+ batch = 0 ;
41+ }
3542 } ,
3643 } ) ;
3744
38- const disconnect = ( ) =>
45+ const disconnect = ( ) => {
46+ process . off ( 'SIGINT' , disconnect ) ;
47+ process . off ( 'SIGTERM' , disconnect ) ;
3948 consumer . disconnect ( ) . then ( ( ) => {
4049 console . log ( "Disconnected successfully" ) ;
4150 } ) ;
51+ }
4252 process . on ( 'SIGINT' , disconnect ) ;
4353 process . on ( 'SIGTERM' , disconnect ) ;
4454}
Original file line number Diff line number Diff line change @@ -31,6 +31,7 @@ class Consumer {
3131 if ( this . #kJSConfig. groupId != null ) {
3232 config [ "group.id" ] = this . #kJSConfig. groupId ;
3333 }
34+ config [ 'offset_commit_cb' ] = true ;
3435 return config ;
3536 }
3637
@@ -101,8 +102,10 @@ class Consumer {
101102 async #consumeSingle( ) {
102103 return new Promise ( ( resolve , reject ) => {
103104 this . #internalClient. consume ( 1 , function ( err , messages ) {
104- if ( err )
105+ if ( err ) {
105106 reject ( `Consume error code ${ err . code } ` ) ;
107+ return ;
108+ }
106109
107110 const message = messages [ 0 ] ;
108111 resolve ( message ) ;
@@ -152,8 +155,18 @@ class Consumer {
152155 }
153156 }
154157
155- async commitOffsets ( topicPartitions ) {
156- this . #notImplemented( ) ;
158+ commitOffsets ( topicPartitions = null ) {
159+ if ( topicPartitions == null ) {
160+ this . #internalClient. commitSync ( ) ;
161+ } else {
162+ const topicPartitions = topicPartitions . map ( ( tpo ) => ( {
163+ topic : tpo . topic ,
164+ partition : tpo . partition ,
165+ offset : Number ( tpo . offset ) ,
166+ } ) )
167+ this . #internalClient. commitSync ( topicPartitions ) ;
168+ }
169+ return Promise . resolve ( )
157170 }
158171
159172 seek ( topicPartitionOffset ) {
You can’t perform that action at this time.
0 commit comments