@@ -29,10 +29,54 @@ class Consumer {
2929
3030 async #finalizedConfig( ) {
3131 const config = await kafkaJSToRdKafkaConfig ( this . #kJSConfig) ;
32- if ( this . #kJSConfig. groupId != null ) {
32+ if ( this . #kJSConfig. groupId ) {
3333 config [ 'group.id' ] = this . #kJSConfig. groupId ;
3434 }
3535 config [ 'offset_commit_cb' ] = true ;
36+ if ( this . #kJSConfig. rebalanceListener ) {
37+ config [ 'rebalance_cb' ] = ( err , assignment ) => {
38+ // Create the librdkafka error
39+ err = LibrdKafkaError . create ( err ) ;
40+
41+ let call ;
42+ switch ( err . code ) {
43+ case LibrdKafkaError . codes . ERR__ASSIGN_PARTITIONS :
44+ call = ( this . #kJSConfig. rebalanceListener . onPartitionsAssigned ?
45+ this . #kJSConfig. rebalanceListener . onPartitionsAssigned ( assignment ) :
46+ Promise . resolve ( ) ) . catch ( console . error ) ;
47+ break ;
48+ case LibrdKafkaError . codes . ERR__REVOKE_PARTITIONS :
49+ call = ( this . #kJSConfig. rebalanceListener . onPartitionsRevoked ?
50+ this . #kJSConfig. rebalanceListener . onPartitionsRevoked ( assignment ) :
51+ Promise . resolve ( ) ) . catch ( console . error ) ;
52+ break ;
53+ default :
54+ call = Promise . reject ( ) . catch ( ( ) => {
55+ console . error ( `Unexpected rebalanceListener error code ${ err . code } ` ) ;
56+ } ) ;
57+ break ;
58+ }
59+
60+ call
61+ . finally ( ( ) => {
62+ // Emit the event
63+ this . #internalClient. emit ( 'rebalance' , err , assignment ) ;
64+
65+ try {
66+ if ( err . code === LibrdKafkaError . codes . ERR__ASSIGN_PARTITIONS ) {
67+ this . #internalClient. assign ( assignment ) ;
68+ } else {
69+ this . #internalClient. unassign ( ) ;
70+ }
71+ } catch ( e ) {
72+ // Ignore exceptions if we are not connected
73+ if ( this . #internalClient. isConnected ( ) ) {
74+ this . #internalClient. emit ( 'rebalance.error' , e ) ;
75+ }
76+ }
77+ } ) ;
78+ } ;
79+ }
3680 return config ;
3781 }
3882
@@ -164,7 +208,7 @@ class Consumer {
164208 }
165209 }
166210
167- commitOffsets ( topicPartitions = null ) {
211+ async commitOffsets ( topicPartitions = null ) {
168212 try {
169213 if ( topicPartitions == null ) {
170214 this . #internalClient. commitSync ( ) ;
@@ -178,7 +222,6 @@ class Consumer {
178222 throw e ;
179223 }
180224 }
181- return Promise . resolve ( )
182225 }
183226
184227 seek ( topicPartitionOffset ) {
0 commit comments