Skip to content

Commit c23d3c9

Browse files
committed
add config for ILP auth
1 parent 87ff4c5 commit c23d3c9

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
3232
public static final String SYMBOL_COLUMNS_CONFIG = "symbols";
3333
private static final String SYMBOL_COLUMNS_DOC = "Comma separated list of columns that should be symbol type";
3434

35+
public static final String USERNAME = "username";
36+
private static final String USERNAME_DOC = "Username for QuestDB ILP authentication";
37+
38+
public static final String TOKEN = "token";
39+
private static final String TOKEN_DOC = "Token for QuestDB ILP authentication";
40+
41+
public static final String TLS = "tls";
42+
public static final String TLS_DOC = "Use TLS for connecting to QuestDB";
43+
3544
public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
3645
super(config, parsedConfig);
3746
}
@@ -49,7 +58,10 @@ public static ConfigDef conf() {
4958
.define(SKIP_UNSUPPORTED_TYPES_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, SKIP_UNSUPPORTED_TYPES_DOC)
5059
.define(DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, Type.STRING, null, Importance.MEDIUM, DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC)
5160
.define(INCLUDE_KEY_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, INCLUDE_KEY_DOC)
52-
.define(SYMBOL_COLUMNS_CONFIG, Type.STRING, null, Importance.MEDIUM, SYMBOL_COLUMNS_DOC);
61+
.define(SYMBOL_COLUMNS_CONFIG, Type.STRING, null, Importance.MEDIUM, SYMBOL_COLUMNS_DOC)
62+
.define(USERNAME, Type.STRING, null, Importance.MEDIUM, USERNAME_DOC)
63+
.define(TOKEN, Type.STRING, null, Importance.MEDIUM, TOKEN_DOC)
64+
.define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC);
5365
}
5466

5567
public String getHost() {
@@ -83,4 +95,16 @@ public boolean isIncludeKey() {
8395
public String getSymbolColumns() {
8496
return getString(SYMBOL_COLUMNS_CONFIG);
8597
}
98+
99+
public String getUsername() {
100+
return getString(USERNAME);
101+
}
102+
103+
public String getToken() {
104+
return getString(TOKEN);
105+
}
106+
107+
public boolean isTls() {
108+
return getBoolean(TLS);
109+
}
86110
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,17 @@ public void start(Map<String, String> map) {
4646
}
4747

4848
private Sender createSender() {
49-
Sender rawSender = Sender.builder().address(config.getHost()).build();
49+
Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost());
50+
if (config.isTls()) {
51+
builder.enableTls();
52+
}
53+
if (config.getUsername() != null) {
54+
if (config.getToken() == null) {
55+
throw new ConnectException("Token is required when username is provided");
56+
}
57+
builder.enableAuth(config.getUsername()).authToken(config.getToken());
58+
}
59+
Sender rawSender = builder.build();
5060
String symbolColumns = config.getSymbolColumns();
5161
if (symbolColumns == null) {
5262
return rawSender;

readme.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ The connector supports following Options:
4949
| timestamp.field.name | STRING | pickup_time | N/A | Designated timestamp field name |
5050
| include.key | BOOLEAN | false | true | Include message key in target table |
5151
| symbols | STRING | instrument,stock | N/A | Comma separated list of columns that should be symbol type |
52+
| user | STRING | user1 | admin | User name for QuestDB. Used only when token is non-empty |
53+
| token | STRING | <sub>QgHCOyq35D5HocCMrUGJinEsjEscJlCp7FZQETH21Bw</sub> | N/A | Token for QuestDB authentication |
54+
| tls | BOOLEAN | true | false | Use TLS for QuestDB connection |
5255

5356
## Supported serialization formats
5457
The connector does not do data deserialization on its own. It relies on Kafka Connect converters to deserialize data. It's been tested predominantly with JSON, but it should work with any converter, including Avro. Converters can be configured using `key.converter` and `value.converter` options, see the table above.

0 commit comments

Comments
 (0)