11const RdKafka = require ( '../rdkafka' ) ;
2- const { kafkaJSToRdKafkaConfig } = require ( './_common' ) ;
2+ const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require ( './_common' ) ;
3+ const { Consumer } = require ( './_consumer' ) ;
34
45const ProducerState = Object . freeze ( {
56 INIT : 0 ,
67 CONNECTING : 1 ,
7- CONNECTED : 2 ,
8- DISCONNECTING : 3 ,
9- DISCONNECTED : 4 ,
8+ INITIALIZING_TRANSACTIONS : 2 ,
9+ INITIALIZED_TRANSACTIONS : 3 ,
10+ CONNECTED : 4 ,
11+ DISCONNECTING : 5 ,
12+ DISCONNECTED : 6 ,
1013} ) ;
1114
1215class Producer {
@@ -15,6 +18,7 @@ class Producer {
1518 #internalClient = null ;
1619 #connectPromiseFunc = { } ;
1720 #state = ProducerState . INIT ;
21+ #ongoingTransaction = false ;
1822
1923 constructor ( kJSConfig ) {
2024 this . #kJSConfig = kJSConfig ;
@@ -29,15 +33,51 @@ class Producer {
2933 async #finalizedConfig( ) {
3034 const config = await kafkaJSToRdKafkaConfig ( this . #kJSConfig) ;
3135 config . dr_cb = 'true' ;
36+
37+ if ( this . #kJSConfig. hasOwnProperty ( 'transactionalId' ) ) {
38+ config [ 'transactional.id' ] = this . #kJSConfig. transactionalId ;
39+ }
40+
3241 return config ;
3342 }
3443
35- #readyCb( arg ) {
36- //console.log('Connected and ready.');
37- if ( this . #state !== ProducerState . CONNECTING ) {
44+ #flattenTopicPartitionOffsets( topics ) {
45+ return topics . flatMap ( topic => {
46+ return topic . partitions . map ( partition => {
47+ return { partition : partition . partition , offset : partition . offset , topic : topic . topic } ;
48+ } )
49+ } )
50+ }
51+
52+ #readyTransactions( err ) {
53+ if ( err ) {
54+ this . #connectPromiseFunc[ "reject" ] ( err ) ;
55+ return ;
56+ }
57+
58+ if ( this . #state !== ProducerState . INITIALIZING_TRANSACTIONS ) {
59+ // FSM impossible state. We should add error handling for
60+ // this later.
61+ return ;
62+ }
63+
64+ this . #state = ProducerState . INITIALIZED_TRANSACTIONS ;
65+ this . #readyCb( null ) ;
66+ }
67+
68+ async #readyCb( arg ) {
69+ if ( this . #state !== ProducerState . CONNECTING && this . #state !== ProducerState . INITIALIZED_TRANSACTIONS ) {
3870 // I really don't know how to handle this now.
3971 return ;
4072 }
73+
74+ let config = await this . #config( ) ;
75+ if ( config . hasOwnProperty ( 'transactional.id' ) && this . #state !== ProducerState . INITIALIZED_TRANSACTIONS ) {
76+ this . #state = ProducerState . INITIALIZING_TRANSACTIONS ;
77+ this . #internalClient. initTransactions ( 5000 /* default: 5s */ , this . #readyTransactions. bind ( this ) ) ;
78+ return ;
79+ }
80+
4181 this . #state = ProducerState . CONNECTED ;
4282
4383 // Start a loop to poll.
@@ -121,6 +161,120 @@ class Producer {
121161 } ) ;
122162 }
123163
164+ async transaction ( ) {
165+ if ( this . #state !== ProducerState . CONNECTED ) {
166+ return Promise . reject ( "Cannot start transaction without awaiting connect()" ) ;
167+ }
168+
169+ if ( this . #ongoingTransaction) {
170+ return Promise . reject ( "Can only start one transaction at a time." ) ;
171+ }
172+
173+ return new Promise ( ( resolve , reject ) => {
174+ this . #internalClient. beginTransaction ( ( err ) => {
175+ if ( err ) {
176+ reject ( err ) ;
177+ return ;
178+ }
179+ this . #ongoingTransaction = true ;
180+
181+ // Resolve with 'this' because we don't need any specific transaction object.
182+ // Just using the producer works since we can only have one transaction
183+ // ongoing for one producer.
184+ resolve ( this ) ;
185+ } ) ;
186+ } ) ;
187+ }
188+
189+ async commit ( ) {
190+ if ( this . #state !== ProducerState . CONNECTED ) {
191+ return Promise . reject ( "Cannot commit without awaiting connect()" ) ;
192+ }
193+
194+ if ( ! this . #ongoingTransaction) {
195+ return Promise . reject ( "Cannot commit, no transaction ongoing." ) ;
196+ }
197+
198+ return new Promise ( ( resolve , reject ) => {
199+ this . #internalClient. commitTransaction ( 5000 /* default: 5000ms */ , err => {
200+ if ( err ) {
201+ // TODO: Do we reset ongoingTransaction here?
202+ reject ( err ) ;
203+ return ;
204+ }
205+ this . #ongoingTransaction = false ;
206+ resolve ( ) ;
207+ } ) ;
208+ } ) ;
209+ }
210+
211+
212+ async abort ( ) {
213+ if ( this . #state !== ProducerState . CONNECTED ) {
214+ return Promise . reject ( "Cannot abort without awaiting connect()" ) ;
215+ }
216+
217+ if ( ! this . #ongoingTransaction) {
218+ return Promise . reject ( "Cannot abort, no transaction ongoing." ) ;
219+ }
220+
221+ return new Promise ( ( resolve , reject ) => {
222+ this . #internalClient. abortTransaction ( 5000 /* default: 5000ms */ , err => {
223+ if ( err ) {
224+ // TODO: Do we reset ongoingTransaction here?
225+ reject ( err ) ;
226+ return ;
227+ }
228+ this . #ongoingTransaction = false ;
229+ resolve ( ) ;
230+ } ) ;
231+ } ) ;
232+ }
233+
234+ async sendOffsets ( arg ) {
235+ let { consumerGroupId, topics, consumer } = arg ;
236+
237+ if ( ( ! consumerGroupId && ! consumer ) || ! Array . isArray ( topics ) || topics . length === 0 ) {
238+ return Promise . reject ( "sendOffsets must have the arguments {consumerGroupId: string or consumer: Consumer, topics: non-empty array" ) ;
239+ }
240+
241+ if ( this . #state !== ProducerState . CONNECTED ) {
242+ return Promise . reject ( "Cannot sendOffsets without awaiting connect()" ) ;
243+ }
244+
245+ if ( ! this . #ongoingTransaction) {
246+ return Promise . reject ( "Cannot sendOffsets, no transaction ongoing." ) ;
247+ }
248+
249+ // If we don't have a consumer, we must create a consumer at this point internally.
250+ // This isn't exactly efficient, but we expect people to use either a consumer,
251+ // or we will need to change the C/C++ code to facilitate using the consumerGroupId
252+ // directly.
253+ // TODO: Change the C/C++ code to facilitate this if we go to release with this.
254+
255+ let consumerCreated = false ;
256+ if ( ! consumer ) {
257+ const config = Object . assign ( { groupId : consumerGroupId } , this . #kJSConfig) ;
258+ consumer = new Consumer ( config ) ;
259+ consumerCreated = true ;
260+ await consumer . connect ( ) ;
261+ }
262+
263+ return new Promise ( ( resolve , reject ) => {
264+ this . #internalClient. sendOffsetsToTransaction (
265+ this . #flattenTopicPartitionOffsets( topics ) . map ( topicPartitionOffsetToRdKafka ) ,
266+ consumer . _getInternalConsumer ( ) ,
267+ async err => {
268+ if ( consumerCreated )
269+ await consumer . disconnect ( ) ;
270+ if ( err )
271+ reject ( err ) ;
272+ else
273+ resolve ( ) ;
274+ } )
275+ } ) ;
276+ }
277+
124278 async send ( sendOptions ) {
125279 if ( this . #state !== ProducerState . CONNECTED ) {
126280 return Promise . reject ( "Cannot send message without awaiting connect()" ) ;
0 commit comments