Skip to content

Commit 2c64abc

Browse files
committed
symbol support
1 parent cd65047 commit 2c64abc

File tree

7 files changed

+295
-26
lines changed

7 files changed

+295
-26
lines changed
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package io.questdb.kafka;
2+
3+
import io.questdb.client.Sender;
4+
import io.questdb.std.BoolList;
5+
import io.questdb.std.LongList;
6+
7+
import java.util.ArrayList;
8+
import java.util.HashSet;
9+
import java.util.List;
10+
import java.util.Set;
11+
12+
final class BufferingSender implements Sender {
13+
private final Sender sender;
14+
15+
private List<CharSequence> timestampNames = new ArrayList<>(4);
16+
private LongList timestampValues = new LongList(4);
17+
18+
private List<CharSequence> longNames = new ArrayList<>(4);
19+
private LongList longValues = new LongList(4);
20+
21+
private List<CharSequence> doubleNames = new ArrayList<>(4);
22+
private DoubleList doubleValues = new DoubleList();
23+
24+
private List<CharSequence> boolNames = new ArrayList<>(4);
25+
private BoolList boolValues = new BoolList(4);
26+
27+
private List<CharSequence> stringNames = new ArrayList<>(4);
28+
private List<CharSequence> stringValues = new ArrayList<>(4);
29+
30+
private List<CharSequence> symbolColumnNames = new ArrayList<>(4);
31+
private List<CharSequence> symbolColumnValues = new ArrayList<>(4);
32+
33+
34+
private Set<CharSequence> symbolColumns = new HashSet<>();
35+
36+
BufferingSender(Sender sender, String symbolColumns) {
37+
this.sender = sender;
38+
if (symbolColumns != null) {
39+
for (String symbolColumn : symbolColumns.split(",")) {
40+
this.symbolColumns.add(symbolColumn.trim());
41+
}
42+
}
43+
}
44+
45+
@Override
46+
public Sender table(CharSequence table) {
47+
return sender.table(table);
48+
}
49+
50+
@Override
51+
public Sender longColumn(CharSequence name, long value) {
52+
if (symbolColumns.contains(name)) {
53+
symbolColumnNames.add(name);
54+
symbolColumnValues.add(String.valueOf(value));
55+
} else {
56+
longNames.add(name);
57+
longValues.add(value);
58+
}
59+
return this;
60+
}
61+
62+
@Override
63+
public Sender stringColumn(CharSequence name, CharSequence value) {
64+
if (symbolColumns.contains(name)) {
65+
symbolColumnNames.add(name);
66+
symbolColumnValues.add(value);
67+
} else {
68+
stringNames.add(name);
69+
stringValues.add(value);
70+
}
71+
return this;
72+
}
73+
74+
@Override
75+
public Sender doubleColumn(CharSequence name, double value) {
76+
if (symbolColumns.contains(name)) {
77+
symbolColumnNames.add(name);
78+
symbolColumnValues.add(String.valueOf(value));
79+
} else {
80+
doubleNames.add(name);
81+
doubleValues.add(value);
82+
}
83+
return this;
84+
}
85+
86+
@Override
87+
public Sender boolColumn(CharSequence name, boolean value) {
88+
if (symbolColumns.contains(name)) {
89+
symbolColumnNames.add(name);
90+
symbolColumnValues.add(String.valueOf(value));
91+
} else {
92+
boolNames.add(name);
93+
boolValues.add(value);
94+
}
95+
return this;
96+
}
97+
98+
@Override
99+
public Sender timestampColumn(CharSequence name, long value) {
100+
if (symbolColumns.contains(name)) {
101+
symbolColumnNames.add(name);
102+
symbolColumnValues.add(String.valueOf(value));
103+
} else {
104+
timestampNames.add(name);
105+
timestampValues.add(value);
106+
}
107+
return this;
108+
}
109+
110+
@Override
111+
public Sender symbol(CharSequence name, CharSequence value) {
112+
symbolColumnNames.add(name);
113+
symbolColumnValues.add(value);
114+
return this;
115+
}
116+
117+
@Override
118+
public void atNow() {
119+
transferFields();
120+
sender.atNow();
121+
}
122+
123+
private void transferFields() {
124+
for (int i = 0, n = symbolColumnNames.size(); i < n; i++) {
125+
CharSequence symbolName = symbolColumnNames.get(i);
126+
CharSequence symbolValue = symbolColumnValues.get(i);
127+
sender.symbol(symbolName, symbolValue);
128+
}
129+
symbolColumnNames.clear();
130+
symbolColumnValues.clear();
131+
132+
for (int i = 0, n = stringNames.size(); i < n; i++) {
133+
CharSequence fieldName = stringNames.get(i);
134+
CharSequence fieldValue = stringValues.get(i);
135+
sender.stringColumn(fieldName, fieldValue);
136+
}
137+
stringNames.clear();
138+
stringValues.clear();
139+
140+
for (int i = 0, n = longNames.size(); i < n; i++) {
141+
CharSequence fieldName = longNames.get(i);
142+
long fieldValue = longValues.get(i);
143+
sender.longColumn(fieldName, fieldValue);
144+
}
145+
longNames.clear();
146+
longValues.clear();
147+
148+
for (int i = 0, n = doubleNames.size(); i < n; i++) {
149+
CharSequence fieldName = doubleNames.get(i);
150+
double fieldValue = doubleValues.get(i);
151+
sender.doubleColumn(fieldName, fieldValue);
152+
}
153+
doubleNames.clear();
154+
doubleValues.clear();
155+
156+
for (int i = 0, n = boolNames.size(); i < n; i++) {
157+
CharSequence fieldName = boolNames.get(i);
158+
boolean fieldValue = boolValues.get(i);
159+
sender.boolColumn(fieldName, fieldValue);
160+
}
161+
boolNames.clear();
162+
boolValues.clear();
163+
164+
for (int i = 0, n = timestampNames.size(); i < n; i++) {
165+
CharSequence fieldName = timestampNames.get(i);
166+
long fieldValue = timestampValues.get(i);
167+
sender.timestampColumn(fieldName, fieldValue);
168+
}
169+
timestampNames.clear();
170+
timestampValues.clear();
171+
}
172+
173+
@Override
174+
public void at(long timestamp) {
175+
transferFields();
176+
sender.at(timestamp);
177+
}
178+
179+
@Override
180+
public void flush() {
181+
sender.flush();
182+
}
183+
184+
@Override
185+
public void close() {
186+
sender.close();
187+
}
188+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.questdb.kafka;
2+
3+
public class DoubleList {
4+
private double[] buffer;
5+
private int pos = 0;
6+
7+
public DoubleList() {
8+
buffer = new double[4];
9+
}
10+
11+
public void add(double value) {
12+
ensureCapacity0(pos + 1);
13+
buffer[pos++] = value;
14+
}
15+
16+
public double get(int index) {
17+
if (index >= pos) {
18+
throw new ArrayIndexOutOfBoundsException(index);
19+
}
20+
return buffer[index];
21+
}
22+
23+
public void clear() {
24+
pos = 0;
25+
}
26+
27+
private void ensureCapacity0(int i) {
28+
if (i > buffer.length) {
29+
double[] newBuffer = new double[buffer.length << 1];
30+
System.arraycopy(buffer, 0, newBuffer, 0, pos);
31+
buffer = newBuffer;
32+
}
33+
}
34+
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
2929
public static final String INCLUDE_KEY_CONFIG = "include.key";
3030
private static final String INCLUDE_KEY_DOC = "Include key in the table";
3131

32+
public static final String SYMBOL_COLUMNS_CONFIG = "symbols";
33+
private static final String SYMBOL_COLUMNS_DOC = "Comma separated list of columns that should be symbol type";
34+
3235
public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
3336
super(config, parsedConfig);
3437
}
@@ -45,7 +48,8 @@ public static ConfigDef conf() {
4548
.define(VALUE_PREFIX_CONFIG, Type.STRING, "", Importance.MEDIUM, VALUE_PREFIX_DOC)
4649
.define(SKIP_UNSUPPORTED_TYPES_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, SKIP_UNSUPPORTED_TYPES_DOC)
4750
.define(DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, Type.STRING, null, Importance.MEDIUM, DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC)
48-
.define(INCLUDE_KEY_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, INCLUDE_KEY_DOC);
51+
.define(INCLUDE_KEY_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, INCLUDE_KEY_DOC)
52+
.define(SYMBOL_COLUMNS_CONFIG, Type.STRING, null, Importance.MEDIUM, SYMBOL_COLUMNS_DOC);
4953
}
5054

5155
public String getHost() {
@@ -75,4 +79,8 @@ public String getDesignatedTimestampColumnName() {
7579
public boolean isIncludeKey() {
7680
return getBoolean(INCLUDE_KEY_CONFIG);
7781
}
82+
83+
public String getSymbolColumns() {
84+
return getString(SYMBOL_COLUMNS_CONFIG);
85+
}
7886
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,19 @@ public String version() {
4141
@Override
4242
public void start(Map<String, String> map) {
4343
this.config = new QuestDBSinkConnectorConfig(map);
44-
this.sender = Sender.builder().address(config.getHost()).build();
44+
this.sender = createSender();
4545
this.timestampColumnName = config.getDesignatedTimestampColumnName();
4646
}
4747

48+
private Sender createSender() {
49+
Sender rawSender = Sender.builder().address(config.getHost()).build();
50+
String symbolColumns = config.getSymbolColumns();
51+
if (symbolColumns == null) {
52+
return rawSender;
53+
}
54+
return new BufferingSender(rawSender, symbolColumns);
55+
}
56+
4857
@Override
4958
public void put(Collection<SinkRecord> collection) {
5059
for (SinkRecord record : collection) {
@@ -85,6 +94,18 @@ private void handleStruct(String parentName, Struct value, Schema schema) {
8594
}
8695
}
8796

97+
private void handleMap(String name, Map<?, ?> value, String fallbackName) {
98+
for (Map.Entry<?, ?> entry : value.entrySet()) {
99+
Object mapKey = entry.getKey();
100+
if (!(mapKey instanceof String)) {
101+
throw new ConnectException("Map keys must be strings");
102+
}
103+
String mapKeyName = (String) mapKey;
104+
String entryName = name.isEmpty() ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName;
105+
handleObject(entryName, null, entry.getValue(), fallbackName);
106+
}
107+
}
108+
88109
private boolean isDesignatedColumnName(String name, String fallbackName) {
89110
if (timestampColumnName == null) {
90111
return false;
@@ -159,15 +180,7 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
159180
} else if (value instanceof Double) {
160181
sender.doubleColumn(actualName, (Double) value);
161182
} else if (value instanceof Map) {
162-
for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
163-
Object mapKey = entry.getKey();
164-
if (!(mapKey instanceof String)) {
165-
throw new ConnectException("Map keys must be strings");
166-
}
167-
String mapKeyName = (String) mapKey;
168-
String entryName = name.isEmpty() ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName;
169-
handleObject(entryName, null, entry.getValue(), fallbackName);
170-
}
183+
handleMap(name, (Map<?, ?>) value, fallbackName);
171184
} else {
172185
onUnsupportedType(actualName, value.getClass().getName());
173186
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,31 @@ public void testSmoke() {
114114
"select firstname,lastname,age from " + topicName);
115115
}
116116

117+
@Test
118+
public void testSymbol() {
119+
connect.kafka().createTopic(topicName, 1);
120+
Map<String, String> props = baseConnectorProps(topicName);
121+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname,lastname");
122+
connect.configureConnector(CONNECTOR_NAME, props);
123+
assertConnectorTaskRunningEventually();
124+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
125+
.field("firstname", Schema.STRING_SCHEMA)
126+
.field("lastname", Schema.STRING_SCHEMA)
127+
.field("age", Schema.INT8_SCHEMA)
128+
.build();
129+
130+
Struct struct = new Struct(schema)
131+
.put("firstname", "John")
132+
.put("lastname", "Doe")
133+
.put("age", (byte) 42);
134+
135+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
136+
137+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n"
138+
+ "\"John\",\"Doe\",42\r\n",
139+
"select firstname,lastname,age from " + topicName);
140+
}
141+
117142
@Test
118143
public void testUpfrontTable() {
119144
connect.kafka().createTopic(topicName, 1);

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ public class DebeziumIT {
6666
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb-connector.jar")
6767
.withCopyFileToContainer(MountableFile.forHostPath(questdbJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb.jar")
6868
.dependsOn(kafkaContainer)
69-
// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("debezium")))
69+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("debezium")))
7070
.withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "true")
7171
.withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true");
7272

7373
@Container
7474
private static final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3")
7575
.withNetwork(network)
7676
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
77-
// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
77+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
7878
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
7979
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
8080

0 commit comments

Comments
 (0)