Skip to content

Commit ad3425f

Browse files
committed
more tests
1 parent 2c64abc commit ad3425f

File tree

3 files changed

+108
-25
lines changed

3 files changed

+108
-25
lines changed

connector/src/main/java/io/questdb/kafka/BufferingSender.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,28 @@
99
import java.util.List;
1010
import java.util.Set;
1111

12+
/**
13+
* Allow to add regular fields before adding symbols.
14+
* <p>
15+
* Internally it buffers symbols and fields and sends them on calling <code>atNow()</code> or <code>at()</code>.
16+
*
17+
*/
1218
final class BufferingSender implements Sender {
13-
private final Sender sender;
14-
15-
private List<CharSequence> timestampNames = new ArrayList<>(4);
16-
private LongList timestampValues = new LongList(4);
17-
18-
private List<CharSequence> longNames = new ArrayList<>(4);
19-
private LongList longValues = new LongList(4);
20-
21-
private List<CharSequence> doubleNames = new ArrayList<>(4);
22-
private DoubleList doubleValues = new DoubleList();
19+
private static final int DEFAULT_CAPACITY = 4;
2320

24-
private List<CharSequence> boolNames = new ArrayList<>(4);
25-
private BoolList boolValues = new BoolList(4);
26-
27-
private List<CharSequence> stringNames = new ArrayList<>(4);
28-
private List<CharSequence> stringValues = new ArrayList<>(4);
29-
30-
private List<CharSequence> symbolColumnNames = new ArrayList<>(4);
31-
private List<CharSequence> symbolColumnValues = new ArrayList<>(4);
21+
private final Sender sender;
22+
private final List<CharSequence> timestampNames = new ArrayList<>(DEFAULT_CAPACITY);
23+
private final LongList timestampValues = new LongList(DEFAULT_CAPACITY);
24+
private final List<CharSequence> longNames = new ArrayList<>(DEFAULT_CAPACITY);
25+
private final LongList longValues = new LongList(DEFAULT_CAPACITY);
26+
private final List<CharSequence> doubleNames = new ArrayList<>(DEFAULT_CAPACITY);
27+
private final DoubleList doubleValues = new DoubleList(DEFAULT_CAPACITY);
28+
private final List<CharSequence> boolNames = new ArrayList<>(DEFAULT_CAPACITY);
29+
private final BoolList boolValues = new BoolList(DEFAULT_CAPACITY);
30+
private final List<CharSequence> stringNames = new ArrayList<>(DEFAULT_CAPACITY);
31+
private final List<CharSequence> stringValues = new ArrayList<>(DEFAULT_CAPACITY);
32+
private final List<CharSequence> symbolColumnNames = new ArrayList<>(DEFAULT_CAPACITY);
33+
private final List<CharSequence> symbolColumnValues = new ArrayList<>(DEFAULT_CAPACITY);
3234

3335

3436
private Set<CharSequence> symbolColumns = new HashSet<>();

connector/src/main/java/io/questdb/kafka/DoubleList.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
11
package io.questdb.kafka;
22

3-
public class DoubleList {
3+
final class DoubleList {
44
private double[] buffer;
55
private int pos = 0;
66

7-
public DoubleList() {
8-
buffer = new double[4];
7+
DoubleList(int initialCapacity) {
8+
buffer = new double[initialCapacity];
99
}
1010

11-
public void add(double value) {
11+
void add(double value) {
1212
ensureCapacity0(pos + 1);
1313
buffer[pos++] = value;
1414
}
1515

16-
public double get(int index) {
16+
double get(int index) {
1717
if (index >= pos) {
1818
throw new ArrayIndexOutOfBoundsException(index);
1919
}
2020
return buffer[index];
2121
}
2222

23-
public void clear() {
23+
void clear() {
2424
pos = 0;
2525
}
2626

27-
private void ensureCapacity0(int i) {
27+
void ensureCapacity0(int i) {
2828
if (i > buffer.length) {
2929
double[] newBuffer = new double[buffer.length << 1];
3030
System.arraycopy(buffer, 0, newBuffer, 0, pos);

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,87 @@ public void testSymbol() {
139139
"select firstname,lastname,age from " + topicName);
140140
}
141141

142+
@Test
143+
public void testSymbol_withAllOtherILPTypes() {
144+
connect.kafka().createTopic(topicName, 1);
145+
Map<String, String> props = baseConnectorProps(topicName);
146+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname");
147+
connect.configureConnector(CONNECTOR_NAME, props);
148+
assertConnectorTaskRunningEventually();
149+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
150+
.field("firstname", Schema.STRING_SCHEMA)
151+
.field("lastname", Schema.STRING_SCHEMA)
152+
.field("age", Schema.INT8_SCHEMA)
153+
.field("vegan", Schema.BOOLEAN_SCHEMA)
154+
.field("height", Schema.FLOAT64_SCHEMA)
155+
.field("birth", Timestamp.SCHEMA)
156+
.build();
157+
158+
java.util.Date birth = new Calendar.Builder()
159+
.setTimeZone(TimeZone.getTimeZone("UTC"))
160+
.setDate(2022, 9, 23) // note: month is 0-based
161+
.setTimeOfDay(13, 53, 59, 123)
162+
.build().getTime();
163+
Struct p1 = new Struct(schema)
164+
.put("firstname", "John")
165+
.put("lastname", "Doe")
166+
.put("age", (byte) 42)
167+
.put("vegan", true)
168+
.put("height", 1.80)
169+
.put("birth", birth);
170+
171+
birth = new Calendar.Builder()
172+
.setTimeZone(TimeZone.getTimeZone("UTC"))
173+
.setDate(2021, 9, 23) // note: month is 0-based
174+
.setTimeOfDay(13, 53, 59, 123)
175+
.build().getTime();
176+
Struct p2 = new Struct(schema)
177+
.put("firstname", "Jane")
178+
.put("lastname", "Doe")
179+
.put("age", (byte) 41)
180+
.put("vegan", false)
181+
.put("height", 1.60)
182+
.put("birth", birth);
183+
184+
connect.kafka().produce(topicName, "p1", new String(converter.fromConnectData(topicName, schema, p1)));
185+
connect.kafka().produce(topicName, "p2", new String(converter.fromConnectData(topicName, schema, p2)));
186+
187+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"vegan\",\"height\",\"birth\"\r\n"
188+
+ "\"John\",\"Doe\",42,true,1.8,\"2022-10-23T13:53:59.123000Z\"\r\n"
189+
+ "\"Jane\",\"Doe\",41,false,1.6,\"2021-10-23T13:53:59.123000Z\"\r\n",
190+
"select firstname,lastname,age,vegan,height,birth from " + topicName);
191+
}
192+
193+
@Test
194+
public void testUpfrontTable_withSymbols() {
195+
connect.kafka().createTopic(topicName, 1);
196+
Map<String, String> props = baseConnectorProps(topicName);
197+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname,lastname");
198+
connect.configureConnector(CONNECTOR_NAME, props);
199+
assertConnectorTaskRunningEventually();
200+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
201+
.field("firstname", Schema.STRING_SCHEMA)
202+
.field("lastname", Schema.STRING_SCHEMA)
203+
.field("age", Schema.INT8_SCHEMA)
204+
.build();
205+
206+
Struct struct = new Struct(schema)
207+
.put("firstname", "John")
208+
.put("lastname", "Doe")
209+
.put("age", (byte) 42);
210+
211+
QuestDBUtils.assertSql(questDBContainer,
212+
"{\"ddl\":\"OK\"}\n",
213+
"create table " + topicName + " (firstname symbol, lastname symbol, age int)",
214+
QuestDBUtils.Endpoint.EXEC);
215+
216+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
217+
218+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n"
219+
+ "\"John\",\"Doe\",42,\"key\"\r\n",
220+
"select * from " + topicName);
221+
}
222+
142223
@Test
143224
public void testUpfrontTable() {
144225
connect.kafka().createTopic(topicName, 1);

0 commit comments

Comments
 (0)