Skip to content

Commit f29bc15

Browse files
committed
Add topic configuration and fix config types
1 parent e3bb095 commit f29bc15

File tree

8 files changed

+283
-254
lines changed

8 files changed

+283
-254
lines changed

examples/kafkajs/consumer.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const { Kafka } = require('../..').KafkaJS
1+
const { Kafka } = require('../..').KafkaJS;
22
//const { Kafka } = require('kafkajs')
33

44
async function consumerStart() {
@@ -32,7 +32,12 @@ async function consumerStart() {
3232
}
3333
},
3434
rdKafka: {
35+
globalConfig: {
3536
'enable.auto.commit': false
37+
},
38+
topicConfig: {
39+
'auto.offset.reset': 'earliest'
40+
},
3641
}
3742
});
3843

examples/kafkajs/eos.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ async function eosStart() {
1515
const consumer = kafka.consumer({
1616
groupId: 'groupId',
1717
rdKafka: {
18-
"enable.auto.commit": false,
18+
globalConfig: {
19+
"enable.auto.commit": false,
20+
}
1921
},
2022
});
2123

@@ -34,7 +36,8 @@ async function eosStart() {
3436
// The run method acts like a consume-transform-produce loop.
3537
consumer.run({
3638
eachMessage: async ({ topic, partition, message }) => {
37-
const msgAckString = JSON.stringify({topic,
39+
const msgAckString = JSON.stringify({
40+
topic,
3841
partition,
3942
offset: message.offset,
4043
key: message.key?.toString(),

lib/kafkajs/_common.js

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,55 @@
1+
/**
2+
* @function kafkaJSToRdKafkaConfig()
3+
* @param {object} config
4+
* @returns {{globalConfig: import("../../types/config").ConsumerGlobalConfig|import("../../types/config").ProducerTopicConfig, topicConfig: import("../../types/config").ConsumerTopicConfig|import("../../types/config").ProducerTopicConfig}}
5+
*/
16
async function kafkaJSToRdKafkaConfig(config) {
2-
const ret = {
3-
'allow.auto.create.topics': 'false'
4-
}
5-
ret['bootstrap.servers'] = config['brokers'].join(',');
7+
const globalConfig = {
8+
"allow.auto.create.topics": "false",
9+
};
10+
const topicConfig = {};
11+
globalConfig["bootstrap.servers"] = config["brokers"].join(",");
612

713
let withSASL = false;
814

915
if (config.sasl) {
10-
const sasl = config.sasl;
11-
if (sasl.mechanism === 'plain' &&
12-
typeof sasl.username === 'string' &&
13-
typeof sasl.password === 'string') {
14-
ret['sasl.mechanism'] = 'PLAIN';
15-
ret['sasl.username'] = sasl.username;
16-
ret['sasl.password'] = sasl.password;
17-
withSASL = true;
16+
const sasl = config.sasl;
17+
if (
18+
sasl.mechanism === "plain" &&
19+
typeof sasl.username === "string" &&
20+
typeof sasl.password === "string"
21+
) {
22+
globalConfig["sasl.mechanism"] = "PLAIN";
23+
globalConfig["sasl.username"] = sasl.username;
24+
globalConfig["sasl.password"] = sasl.password;
25+
withSASL = true;
1826
}
1927
}
2028

2129
if (config.ssl === true && withSASL) {
22-
ret['security.protocol'] = 'sasl_ssl';
30+
globalConfig["security.protocol"] = "sasl_ssl";
2331
} else if (withSASL) {
24-
ret['security.protocol'] = 'sasl_plaintext';
32+
globalConfig["security.protocol"] = "sasl_plaintext";
2533
}
2634

2735
if (config.rdKafka) {
2836
if (config.rdKafka.constructor === Function) {
29-
await config.rdKafka(ret);
37+
await config.rdKafka(globalConfig, topicConfig);
3038
} else {
31-
Object.assign(ret, config.rdKafka);
39+
Object.assign(globalConfig, config.rdKafka.globalConfig);
40+
Object.assign(topicConfig, config.rdKafka.topicConfig);
3241
}
3342
}
3443

35-
return ret;
44+
return { globalConfig, topicConfig };
3645
}
3746

3847
function topicPartitionOffsetToRdKafka(tpo) {
3948
return {
4049
topic: tpo.topic,
4150
partition: tpo.partition,
4251
offset: Number(tpo.offset),
43-
}
52+
};
4453
}
4554

46-
module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka }
55+
module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka };

lib/kafkajs/_consumer.js

Lines changed: 70 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ const RdKafka = require('../rdkafka');
33
const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_common');
44

55
const ConsumerState = Object.freeze({
6-
INIT: 0,
7-
CONNECTING: 1,
6+
INIT: 0,
7+
CONNECTING: 1,
88
CONNECTED: 2,
99
DISCONNECTING: 3,
1010
DISCONNECTED: 4,
@@ -17,38 +17,42 @@ class Consumer {
1717
#connectPromiseFunc = {};
1818
#state = ConsumerState.INIT;
1919

20+
/**
21+
* @constructor
22+
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
23+
*/
2024
constructor(kJSConfig) {
2125
this.#kJSConfig = kJSConfig;
2226
}
2327

24-
#config() {
28+
async #config() {
2529
if (!this.#rdKafkaConfig)
26-
this.#rdKafkaConfig = this.#finalizedConfig();
30+
this.#rdKafkaConfig = await this.#finalizedConfig();
2731
return this.#rdKafkaConfig;
2832
}
2933

3034
async #finalizedConfig() {
31-
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
35+
const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
3236
if (this.#kJSConfig.groupId) {
33-
config['group.id'] = this.#kJSConfig.groupId;
37+
globalConfig['group.id'] = this.#kJSConfig.groupId;
3438
}
35-
config['offset_commit_cb'] = true;
39+
globalConfig['offset_commit_cb'] = true;
3640
if (this.#kJSConfig.rebalanceListener) {
37-
config['rebalance_cb'] = (err, assignment) => {
41+
globalConfig['rebalance_cb'] = (err, assignment) => {
3842
// Create the librdkafka error
3943
err = LibrdKafkaError.create(err);
4044

4145
let call;
42-
switch(err.code) {
46+
switch (err.code) {
4347
case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS:
4448
call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ?
45-
this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) :
46-
Promise.resolve()).catch(console.error);
49+
this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) :
50+
Promise.resolve()).catch(console.error);
4751
break;
4852
case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS:
4953
call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ?
50-
this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) :
51-
Promise.resolve()).catch(console.error);
54+
this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) :
55+
Promise.resolve()).catch(console.error);
5256
break;
5357
default:
5458
call = Promise.reject().catch(() => {
@@ -58,46 +62,46 @@ class Consumer {
5862
}
5963

6064
call
61-
.finally(() => {
62-
// Emit the event
63-
this.#internalClient.emit('rebalance', err, assignment);
64-
65-
try {
66-
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
67-
this.#internalClient.assign(assignment);
68-
} else {
69-
this.#internalClient.unassign();
65+
.finally(() => {
66+
// Emit the event
67+
this.#internalClient.emit('rebalance', err, assignment);
68+
69+
try {
70+
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
71+
this.#internalClient.assign(assignment);
72+
} else {
73+
this.#internalClient.unassign();
74+
}
75+
} catch (e) {
76+
// Ignore exceptions if we are not connected
77+
if (this.#internalClient.isConnected()) {
78+
this.#internalClient.emit('rebalance.error', e);
79+
}
7080
}
71-
} catch (e) {
72-
// Ignore exceptions if we are not connected
73-
if (this.#internalClient.isConnected()) {
74-
this.#internalClient.emit('rebalance.error', e);
75-
}
76-
}
77-
});
81+
});
7882
};
7983
}
80-
return config;
84+
return { globalConfig, topicConfig };
8185
}
8286

8387
#readyCb(arg) {
84-
if (this.#state !== ConsumerState.CONNECTING) {
85-
// I really don't know how to handle this now.
86-
return;
87-
}
88-
this.#state = ConsumerState.CONNECTED;
88+
if (this.#state !== ConsumerState.CONNECTING) {
89+
// I really don't know how to handle this now.
90+
return;
91+
}
92+
this.#state = ConsumerState.CONNECTED;
8993

90-
// Resolve the promise.
91-
this.#connectPromiseFunc['resolve']();
94+
// Resolve the promise.
95+
this.#connectPromiseFunc['resolve']();
9296
}
9397

9498
#errorCb(args) {
95-
console.log('error', args);
96-
if (this.#state === ConsumerState.CONNECTING) {
97-
this.#connectPromiseFunc['reject'](args);
98-
} else {
99-
// do nothing for now.
100-
}
99+
console.log('error', args);
100+
if (this.#state === ConsumerState.CONNECTING) {
101+
this.#connectPromiseFunc['reject'](args);
102+
} else {
103+
// do nothing for now.
104+
}
101105
}
102106

103107
#notImplemented() {
@@ -111,7 +115,7 @@ class Consumer {
111115
}
112116

113117
let timestamp = message.timestamp ? new Date(message.timestamp).toISOString()
114-
: '';
118+
: '';
115119

116120
var headers = undefined;
117121
if (message.headers) {
@@ -139,14 +143,14 @@ class Consumer {
139143
size: message.size,
140144
headers
141145
},
142-
heartbeat: async () => {},
143-
pause: () => {}
146+
heartbeat: async () => { },
147+
pause: () => { }
144148
}
145149
}
146150

147151
async #consumeSingle() {
148152
return new Promise((resolve, reject) => {
149-
this.#internalClient.consume(1, function(err, messages) {
153+
this.#internalClient.consume(1, function (err, messages) {
150154
if (err) {
151155
reject(`Consume error code ${err.code}`);
152156
return;
@@ -168,7 +172,7 @@ class Consumer {
168172
});
169173
else {
170174
for (let partition of topic.partitions) {
171-
ret.push({topic: topic.topic, partition});
175+
ret.push({ topic: topic.topic, partition });
172176
}
173177
}
174178
}
@@ -180,22 +184,23 @@ class Consumer {
180184
}
181185

182186
async connect() {
183-
if (this.#state !== ConsumerState.INIT) {
184-
return Promise.reject('Connect has already been called elsewhere.');
185-
}
187+
if (this.#state !== ConsumerState.INIT) {
188+
return Promise.reject('Connect has already been called elsewhere.');
189+
}
186190

187-
this.#state = ConsumerState.CONNECTING;
188-
this.#internalClient = new RdKafka.KafkaConsumer(await this.#config());
189-
this.#internalClient.on('ready', this.#readyCb.bind(this));
190-
this.#internalClient.on('event.error', this.#errorCb.bind(this));
191-
this.#internalClient.on('event.log', console.log);
192-
193-
return new Promise((resolve, reject) => {
194-
this.#connectPromiseFunc = {resolve, reject};
195-
console.log('Connecting....');
196-
this.#internalClient.connect();
197-
console.log('connect() called');
198-
});
191+
this.#state = ConsumerState.CONNECTING;
192+
const { globalConfig, topicConfig } = await this.#config();
193+
this.#internalClient = new RdKafka.KafkaConsumer(globalConfig, topicConfig);
194+
this.#internalClient.on('ready', this.#readyCb.bind(this));
195+
this.#internalClient.on('event.error', this.#errorCb.bind(this));
196+
this.#internalClient.on('event.log', console.log);
197+
198+
return new Promise((resolve, reject) => {
199+
this.#connectPromiseFunc = { resolve, reject };
200+
console.log('Connecting....');
201+
this.#internalClient.connect();
202+
console.log('connect() called');
203+
});
199204
}
200205

201206
async subscribe(subscription) {
@@ -208,7 +213,7 @@ class Consumer {
208213

209214
async run(config) {
210215
if (this.#state !== ConsumerState.CONNECTED) {
211-
throw new Error('Run must be called in state CONNECTED.');
216+
throw new Error('Run must be called in state CONNECTED.');
212217
}
213218

214219
while (this.#state === ConsumerState.CONNECTED) {
@@ -240,7 +245,7 @@ class Consumer {
240245
seek(topicPartitionOffset) {
241246
return new Promise((resolve, reject) => {
242247
const rdKafkaTopicPartitionOffset =
243-
topicPartitionOffsetToRdKafka(topicPartitionOffset);
248+
topicPartitionOffsetToRdKafka(topicPartitionOffset);
244249
this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => {
245250
if (err) {
246251
reject(new Error(`Seek error code ${err.code}`));

0 commit comments

Comments
 (0)