11const error = require ( "./_error" ) ;
2- const LibrdKafkaError = require ( '../error' ) ;
32
43/**
54 * @function kafkaJSToRdKafkaConfig()
@@ -47,7 +46,13 @@ async function kafkaJSToRdKafkaConfig(config) {
4746 return { globalConfig, topicConfig } ;
4847}
4948
49+ /**
50+ * Converts a topicPartitionOffset from KafkaJS to a format that can be used by node-rdkafka.
51+ * @param {import("../../types/kafkajs").TopicPartitionOffset } tpo
52+ * @returns {{topic: string, partition: number, offset: number} }
53+ */
5054function topicPartitionOffsetToRdKafka ( tpo ) {
55+ // TODO: do we need some checks for negative offsets and stuff? Or 'named' offsets?
5156 return {
5257 topic : tpo . topic ,
5358 partition : tpo . partition ,
@@ -57,8 +62,8 @@ function topicPartitionOffsetToRdKafka(tpo) {
5762
5863/**
5964 * Convert a librdkafka error from node-rdkafka into a KafkaJSError.
60- * @param {LibrdKafkaError } librdKafkaError to convert from.
61- * @returns KafkaJSError
65+ * @param {import("../error") } librdKafkaError to convert from.
66+ * @returns { error. KafkaJSError} the converted error.
6267 */
6368function createKafkaJsErrorFromLibRdKafkaError ( librdKafkaError ) {
6469 const properties = {
@@ -72,34 +77,57 @@ function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
7277 let err = null ;
7378
7479 if ( properties . code === error . ErrorCodes . ERR_OFFSET_OUT_OF_RANGE ) {
75- err = new error . KafkaJSOffsetOutOfRange ( e , properties ) ;
80+ err = new error . KafkaJSOffsetOutOfRange ( librdKafkaError , properties ) ;
7681 } else if ( properties . code === error . ErrorCodes . ERR_REQUEST_TIMED_OUT ) {
77- err = new error . KafkaJSRequestTimeoutError ( e , properties ) ;
82+ err = new error . KafkaJSRequestTimeoutError ( librdKafkaError , properties ) ;
7883 } else if ( properties . code === error . ErrorCodes . ERR__PARTIAL ) {
79- err = new error . KafkaJSPartialMessageError ( e , properties ) ;
84+ err = new error . KafkaJSPartialMessageError ( librdKafkaError , properties ) ;
8085 } else if ( properties . code === error . ErrorCodes . ERR__AUTHENTICATION ) {
81- err = new error . KafkaJSSASLAuthenticationError ( e , properties ) ;
86+ err = new error . KafkaJSSASLAuthenticationError ( librdKafkaError , properties ) ;
8287 } else if ( properties . code === error . ErrorCodes . ERR_GROUP_COORDINATOR_NOT_AVAILABLE ) {
83- err = new error . KafkaJSGroupCoordinatorNotAvailableError ( e , properties ) ;
88+ err = new error . KafkaJSGroupCoordinatorNotAvailableError ( librdKafkaError , properties ) ;
8489 } else if ( properties . code === error . ErrorCodes . ERR__NOT_IMPLEMENTED ) {
85- err = new error . KafkaJSNotImplemented ( e , properties ) ;
90+ err = new error . KafkaJSNotImplemented ( librdKafkaError , properties ) ;
8691 } else if ( properties . code === error . ErrorCodes . ERR__TIMED_OUT ) {
87- err = new error . KafkaJSTimedOut ( e , properties ) ;
92+ err = new error . KafkaJSTimedOut ( librdKafkaError , properties ) ;
8893 } else if ( properties . code === error . ErrorCodes . ERR__ALL_BROKERS_DOWN ) {
89- err = new error . KafkaJSNoBrokerAvailableError ( e , properties ) ;
94+ err = new error . KafkaJSNoBrokerAvailableError ( librdKafkaError , properties ) ;
9095 } else if ( properties . code === error . ErrorCodes . ERR__TRANSPORT ) {
91- err = new error . KafkaJSConnectionError ( e , properties ) ;
96+ err = new error . KafkaJSConnectionError ( librdKafkaError , properties ) ;
9297 } else if ( properties . code > 0 ) { /* Indicates a non-local error */
93- err = new error . KafkaJSProtocolError ( e , properties ) ;
98+ err = new error . KafkaJSProtocolError ( librdKafkaError , properties ) ;
9499 } else {
95- err = new error . KafkaJSError ( e , properties ) ;
100+ err = new error . KafkaJSError ( librdKafkaError , properties ) ;
96101 }
97102
103+ console . log ( "Converted err = " + JSON . stringify ( err , null , 2 ) + " librdkafka erro = " + JSON . stringify ( librdKafkaError , null , 2 ) ) ;
98104 return err ;
99105}
100106
107+ /**
108+ * Converts KafkaJS headers to a format that can be used by node-rdkafka.
109+ * @param {import("../../types/kafkajs").IHeaders|null } kafkaJSHeaders
110+ * @returns {import("../../").MessageHeader[]|null } the converted headers.
111+ */
112+ function convertToRdKafkaHeaders ( kafkaJSHeaders ) {
113+ if ( ! kafkaJSHeaders ) return null ;
114+
115+ const headers = [ ] ;
116+ for ( const [ key , value ] of Object . entries ( kafkaJSHeaders ) ) {
117+ if ( value . constructor === Array ) {
118+ for ( const v of value ) {
119+ headers . push ( { key, value : v } ) ;
120+ }
121+ } else {
122+ headers . push ( { key, value } ) ;
123+ }
124+ }
125+ return headers ;
126+ }
127+
101128module . exports = {
102129 kafkaJSToRdKafkaConfig,
103130 topicPartitionOffsetToRdKafka,
104131 createKafkaJsErrorFromLibRdKafkaError,
132+ convertToRdKafkaHeaders,
105133} ;
0 commit comments