@@ -679,6 +679,61 @@ class Producer {
679679
680680 this . #internalClient. setSaslCredentials ( args . username , args . password ) ;
681681 }
682+
683+ /**
684+ * Flushes any pending messages.
685+ *
686+ * Messages are batched internally by librdkafka for performance reasons.
687+ * Continously sent messages are batched upto a timeout, or upto a maximum
688+ * size. Calling flush sends any pending messages immediately without
689+ * waiting for this size or timeout.
690+ *
691+ * @param {number } args.timeout Time to try flushing for in milliseconds.
692+ * @returns {Promise<void> } Resolves on successful flush.
693+ * @throws {KafkaJSTimeout } if the flush times out.
694+ *
695+ * @note This is only useful when using asynchronous sends.
696+ * For example, the following code does not get any benefit from flushing,
697+ * since `await`ing the send waits for the delivery report, and the message
698+ * has already been sent by the time we start flushing:
699+ * for (let i = 0; i < 100; i++) await send(...);
700+ * await flush(...) // Not useful.
701+ *
702+ * However, using the following code may put these 5 messages into a batch
703+ * and then the subsequent `flush` will send the batch altogether (as long as
704+ * batch size, etc. are conducive to batching):
705+ * for (let i = 0; i < 5; i++) send(...);
706+ * await flush({timeout: 5000});
707+ */
708+ async flush ( args = { timeout : 500 } ) {
709+ if ( this . #state !== ProducerState . CONNECTED ) {
710+ throw new error . KafkaJSError ( "Cannot flush without awaiting connect()" , { code : error . ErrorCodes . ERR__STATE } ) ;
711+ }
712+
713+ if ( ! Object . hasOwn ( args , 'timeout' ) ) {
714+ throw new error . KafkaJSError ( "timeout must be set for flushing" , { code : error . ErrorCodes . ERR__INVALID_ARG } ) ;
715+ }
716+
717+ return new Promise ( ( resolve , reject ) => {
718+ this . #internalClient. flush ( args . timeout , ( err ) => {
719+ if ( err ) {
720+ const kjsErr = createKafkaJsErrorFromLibRdKafkaError ( err ) ;
721+ if ( err . code === error . ErrorCodes . ERR__TIMED_OUT ) {
722+ /* See reason below for yield. Same here - but for partially processed delivery reports. */
723+ setTimeout ( ( ) => reject ( kjsErr ) , 0 ) ;
724+ } else {
725+ reject ( kjsErr ) ;
726+ }
727+ return ;
728+ }
729+ /* Yielding here allows any 'then's and 'awaits' on associated sends to be scheduled
730+ * before flush completes, which means that the user doesn't have to yield themselves.
731+ * It's not necessary that all the 'then's and 'awaits' will be able to run, but
732+ * it's better than nothing. */
733+ setTimeout ( resolve , 0 ) ;
734+ } ) ;
735+ } ) ;
736+ }
682737}
683738
684739module . exports = { Producer, CompressionTypes } ;
0 commit comments