From ea07d9dc3307f5851c2099c5ca50a53c4c3d0f73 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 28 Jul 2025 14:06:38 +0200 Subject: [PATCH 1/9] Update QuestDB client --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 31acf2d..5885e7c 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ org.questdb questdb - 8.2.0 + 9.0.1 org.junit.jupiter From 08f432a55ed81d531f11456def9331c4e2b638a9 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 28 Jul 2025 14:19:29 +0200 Subject: [PATCH 2/9] buffered client stubs --- .../io/questdb/kafka/BufferingSender.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/connector/src/main/java/io/questdb/kafka/BufferingSender.java b/connector/src/main/java/io/questdb/kafka/BufferingSender.java index 36ad3d3..4315a8f 100644 --- a/connector/src/main/java/io/questdb/kafka/BufferingSender.java +++ b/connector/src/main/java/io/questdb/kafka/BufferingSender.java @@ -1,8 +1,11 @@ package io.questdb.kafka; import io.questdb.client.Sender; +import io.questdb.cutlass.line.array.DoubleArray; +import io.questdb.cutlass.line.array.LongArray; import io.questdb.std.BoolList; import io.questdb.std.LongList; +import io.questdb.std.bytes.DirectByteSlice; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -97,6 +100,11 @@ public Sender boolColumn(CharSequence name, boolean value) { return this; } + @Override + public DirectByteSlice bufferView() { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void cancelRow() { symbolColumnNames.clear(); @@ -230,4 +238,44 @@ public void flush() { public void close() { sender.close(); } + + @Override + public Sender doubleArray(CharSequence charSequence, double[] doubles) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender doubleArray(CharSequence charSequence, double[][] doubles) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender doubleArray(CharSequence charSequence, double[][][] doubles) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender doubleArray(CharSequence charSequence, DoubleArray doubleArray) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender longArray(CharSequence charSequence, long[] longs) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender longArray(CharSequence charSequence, long[][] longs) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender longArray(CharSequence charSequence, long[][][] longs) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Sender longArray(CharSequence charSequence, LongArray longArray) { + throw new UnsupportedOperationException("not implemented"); + } } From a60ec9f1b46a5e8acb7253428ae7ab6c631dd28c Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 28 Jul 2025 15:50:58 +0200 Subject: [PATCH 3/9] adjust for new cancelRow() behaviour --- .../src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 1036c3d..a8bab2c 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -325,19 +325,21 @@ private void handleSingleRecord(SinkRecord record) { throw new InvalidDataException("Table name cannot be empty"); } + boolean partialRecord = false; try { sender.table(tableName); + partialRecord = true; if (config.isIncludeKey()) { handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME); } handleObject(config.getValuePrefix(), record.valueSchema(), recordValue, PRIMITIVE_VALUE_FALLBACK_NAME); } catch (InvalidDataException ex) { - if (httpTransport) { + if (httpTransport && partialRecord) { sender.cancelRow(); } throw ex; } catch (LineSenderException ex) { - if (httpTransport) { + if (httpTransport && partialRecord) { sender.cancelRow(); } throw new InvalidDataException("object contains invalid data", ex); From d16cd0548699091fc79191458a6fc97350583374 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 29 Jul 2025 12:53:26 +0200 Subject: [PATCH 4/9] 1D arrays supported --- .../io/questdb/kafka/QuestDBSinkTask.java | 74 +++++- .../io/questdb/kafka/ConnectTestUtils.java | 5 +- .../QuestDBSinkConnectorEmbeddedTest.java | 243 +++++++++++++++++- 3 files changed, 311 insertions(+), 11 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index a8bab2c..f8fefc4 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -114,7 +114,7 @@ private Sender createRawSender() { } return Sender.fromConfig(sink); } - log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options"); + log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.com/docs/third-party-tools/kafka/#configuration-manual"); Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost()); if (config.isTls()) { builder.enableTls(); @@ -483,6 +483,8 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa } else if (value instanceof java.util.Date) { long epochMillis = ((java.util.Date) value).getTime(); sender.timestampColumn(actualName, TimeUnit.MILLISECONDS.toMicros(epochMillis), ChronoUnit.MICROS); + } else if (value instanceof List) { + handleArrayWithoutSchema(actualName, (List) value); } else { onUnsupportedType(actualName, value.getClass().getName()); } @@ -539,8 +541,10 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec case STRUCT: handleStruct(name, (Struct) value, schema); break; - case BYTES: case ARRAY: + handleArray(sanitizedName, value, schema); + break; + case BYTES: case MAP: default: onUnsupportedType(name, type); @@ -548,6 +552,68 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec return true; } + private void handleArray(String name, Object value, Schema schema) { + if (value == null) { + return; + } + + Schema valueSchema = schema.valueSchema(); + if (valueSchema == null) { + throw new InvalidDataException("Array schema must have a value schema"); + } + + Schema.Type elementType = valueSchema.type(); + + if (elementType == Schema.Type.FLOAT32 || elementType == Schema.Type.FLOAT64) { + List list = (List) value; + // todo: do not allocate new arrays + double[] doubleArray = new double[list.size()]; + for (int i = 0; i < list.size(); i++) { + Object element = list.get(i); + if (element == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + doubleArray[i] = ((Number) element).doubleValue(); + } + sender.doubleArray(name, doubleArray); + } else if (elementType == Schema.Type.ARRAY) { + onUnsupportedType(name, "Multidimensional ARRAY"); + } else { + onUnsupportedType(name, "ARRAY<" + elementType + ">"); + } + } + + private void handleArrayWithoutSchema(String name, List list) { + if (list == null || list.isEmpty()) { + return; + } + + Object firstElement = list.get(0); + if (firstElement == null) { + throw new InvalidDataException("QuestDB array elements cannot be null"); + } + + if (firstElement instanceof Number) { + // todo: do not allocate new arrays + double[] doubleArray = new double[list.size()]; + for (int i = 0; i < list.size(); i++) { + Object element = list.get(i); + if (element == null) { + onUnsupportedType(name, "null element in ARRAY"); + } else if (!(element instanceof Number)) { + onUnsupportedType(name, "ARRAY<" + element.getClass().getSimpleName() + ">"); + } else { + doubleArray[i] = ((Number) element).doubleValue(); + } + } + sender.doubleArray(name, doubleArray); + } else if (firstElement instanceof List) { + onUnsupportedType(name, "Multidimensional ARRAY"); + } else { + onUnsupportedType(name, "ARRAY<" + firstElement.getClass().getSimpleName() + ">"); + } + } + private void onUnsupportedType(String name, Object type) { if (config.isSkipUnsupportedTypes()) { log.debug("Skipping unsupported type: {}, name: {}", type, name); @@ -577,8 +643,8 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { sender.timestampColumn(name, epochMillis, ChronoUnit.MILLIS); return true; case Time.LOGICAL_NAME: - d = (java.util.Date) value; - long dayMillis = d.getTime(); + java.util.Date timeValue = (java.util.Date) value; + long dayMillis = timeValue.getTime(); sender.longColumn(name, dayMillis); return true; case Decimal.LOGICAL_NAME: diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index 663c339..82caed0 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -57,8 +57,9 @@ static Map baseConnectorProps(GenericContainer questDBContain confString = "http::addr=" + host + ":" + port + ";"; props.put("client.conf.string", confString); } else { - String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); - props.put("host", ilpIUrl); + int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + confString = "tcp::addr=" + host + ":" + port + ";protocol_version=2;"; + props.put("client.conf.string", confString); } return props; } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 8d36948..db47d08 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -52,7 +52,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { private static int httpPort = -1; private static int ilpPort = -1; - private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.2.0"; + private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:9.0.1"; private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true; private EmbeddedConnectCluster connect; @@ -957,7 +957,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt // make sure we have all records in the table QuestDBUtils.assertSqlEventually( - "\"count\"\r\n" + "\"count()\"\r\n" + recordCount + "\r\n", "select count(*) from " + topicName, 600, @@ -1545,15 +1545,28 @@ public void testJsonNoSchema_mixedFlotingAndIntTypes(boolean useHttp) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testJsonNoSchema_ArrayNotSupported(boolean useHttp) { + public void testJsonNoSchema_intArraySendAsDoubleArray(boolean useHttp) { + // In schema-less mode, we have to be lenient with array element types. + // Since floating point numbers without any actual decimal point are + // instantiated as integers by Kafka Connect. + // + // This will become problematic once QuestDB supports arrays of integers, + // as it will not be able to distinguish between an array of integers and + // an array of doubles. + // For now, we just assume that all arrays are of doubles. + + connect.kafka().createTopic(topicName, 1); Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"array\":[1,2,3]}"); + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"arr\":[1,2,3]}"); - ConnectTestUtils.assertConnectorTaskFailedEventually(connect); + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"arr\"\r\n" + + "\"John\",\"Doe\",42,\"[1.0,2.0,3.0]\"\r\n", + "select firstname,lastname,age,arr from " + topicName, + httpPort); } @ParameterizedTest @@ -2016,4 +2029,224 @@ public void testMultiLevelNestedStructInValue(boolean useHttp) { "select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName, httpPort); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFloat32ArraySupport(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with float array + Schema arraySchema = SchemaBuilder.array(Schema.FLOAT32_SCHEMA).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Measurement") + .field("sensor_id", Schema.STRING_SCHEMA) + .field("readings", arraySchema) + .build(); + + Struct struct = new Struct(schema) + .put("sensor_id", "sensor1") + .put("readings", Arrays.asList(23.5f, 24.1f, 23.8f)); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"sensor_id\",\"readings\"\r\n" + + "\"sensor1\",\"[23.5,24.100000381469727,23.799999237060547]\"\r\n", + "select sensor_id, readings from " + topicName, + httpPort + ); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFloat64ArraySupport(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with double array + Schema arraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Measurement") + .field("device", Schema.STRING_SCHEMA) + .field("temperatures", arraySchema) + .build(); + + Struct struct = new Struct(schema) + .put("device", "thermometer1") + .put("temperatures", Arrays.asList(98.6, 99.1, 97.9)); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"device\",\"temperatures\"\r\n" + + "\"thermometer1\",\"[98.6,99.1,97.9]\"\r\n", + "select device, temperatures from " + topicName, + httpPort + ); + } + + @Test + public void testSchemalessFloatArraySupport() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Send JSON with array of doubles + String json = "{\"location\":\"room1\",\"humidity_readings\":[45.2,46.8,44.9]}"; + connect.kafka().produce(topicName, json); + + QuestDBUtils.assertSqlEventually( + "\"location\",\"humidity_readings\"\r\n" + + "\"room1\",\"[45.2,46.8,44.9]\"\r\n", + "select location, humidity_readings from " + topicName, + httpPort + ); + } + + @Test + public void testSchemalessFloatArraySupport_floatFollowedByInt() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Send JSON with array of doubles + String json = "{\"location\":\"room1\",\"humidity_readings\":[45.0,46,44.9]}"; + connect.kafka().produce(topicName, json); + + QuestDBUtils.assertSqlEventually( + "\"location\",\"humidity_readings\"\r\n" + + "\"room1\",\"[45.0,46.0,44.9]\"\r\n", + "select location, humidity_readings from " + topicName, + httpPort + ); + } + + @Test + public void testSchemalessFloatArraySupport_intFollowedByFloat() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Send JSON with array of doubles + String json = "{\"location\":\"room1\",\"humidity_readings\":[45,46.0,44.9]}"; + connect.kafka().produce(topicName, json); + + QuestDBUtils.assertSqlEventually( + "\"location\",\"humidity_readings\"\r\n" + + "\"room1\",\"[45.0,46.0,44.9]\"\r\n", + "select location, humidity_readings from " + topicName, + httpPort + ); + } + + + @Test + public void testIntegerArrayRejection() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put("errors.tolerance", "none"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with integer array (should fail) + Schema arraySchema = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Counter") + .field("name", Schema.STRING_SCHEMA) + .field("counts", arraySchema) + .build(); + + Struct struct = new Struct(schema) + .put("name", "counter1") + .put("counts", Arrays.asList(1, 2, 3)); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + // The connector should fail to process this record + ConnectTestUtils.assertConnectorTaskFailedEventually(connect); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testNestedStructWithArray(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + Schema arraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); + Schema sensorSchema = SchemaBuilder.struct() + .field("type", Schema.STRING_SCHEMA) + .field("values", arraySchema) + .build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Device") + .field("id", Schema.STRING_SCHEMA) + .field("sensor", sensorSchema) + .build(); + + Struct sensorStruct = new Struct(sensorSchema) + .put("type", "temperature") + .put("values", Arrays.asList(20.5, 21.0, 20.8)); + + Struct struct = new Struct(schema) + .put("id", "device1") + .put("sensor", sensorStruct); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"id\",\"sensor_type\",\"sensor_values\"\r\n" + + "\"device1\",\"temperature\",\"[20.5,21.0,20.8]\"\r\n", + "select id, sensor_type, sensor_values from " + topicName, + httpPort + ); + } + + @Test + public void testArrayWithSkipUnsupportedTypes() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("skip.unsupported.types", "true"); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with string array (unsupported) + Schema arraySchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Data") + .field("names", arraySchema) + .field("value", Schema.FLOAT64_SCHEMA) + .build(); + + Struct struct = new Struct(schema) + .put("names", Arrays.asList("a", "b", "c")) + .put("value", 42.0); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + // Verify - string array should be skipped but double field should be written + QuestDBUtils.assertSqlEventually( + "\"value\"\r\n" + + "42.0\r\n", + "select value from " + topicName, + httpPort + ); + } } From 82da49a92d2d53f7d4c674da8bac54e35dedc434 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 29 Jul 2025 18:05:27 +0200 Subject: [PATCH 5/9] support for symbols --- .../java/io/questdb/kafka/BufferingSender.java | 14 +++++++++++++- .../java/io/questdb/kafka/QuestDBSinkTask.java | 1 + .../kafka/QuestDBSinkConnectorEmbeddedTest.java | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/io/questdb/kafka/BufferingSender.java b/connector/src/main/java/io/questdb/kafka/BufferingSender.java index 4315a8f..52a6138 100644 --- a/connector/src/main/java/io/questdb/kafka/BufferingSender.java +++ b/connector/src/main/java/io/questdb/kafka/BufferingSender.java @@ -37,6 +37,8 @@ final class BufferingSender implements Sender { private final List symbolColumnNames = new ArrayList<>(DEFAULT_CAPACITY); private final List symbolColumnValues = new ArrayList<>(DEFAULT_CAPACITY); private final Set symbolColumns = new HashSet<>(); + private final List doubleArrayNames = new ArrayList<>(DEFAULT_CAPACITY); + private final List doubleArrayValues = new ArrayList<>(DEFAULT_CAPACITY); BufferingSender(Sender sender, String symbolColumns) { this.sender = sender; @@ -201,6 +203,14 @@ private void transferFields() { } timestampNames.clear(); timestampValues.clear(); + + for (int i = 0, n = doubleArrayNames.size(); i < n; i++) { + CharSequence fieldName = doubleArrayNames.get(i); + double[] fieldValue = doubleArrayValues.get(i); + sender.doubleArray(fieldName, fieldValue); + } + doubleArrayNames.clear(); + doubleArrayValues.clear(); } private static long unitToMicros(long value, ChronoUnit unit) { @@ -241,7 +251,9 @@ public void close() { @Override public Sender doubleArray(CharSequence charSequence, double[] doubles) { - throw new UnsupportedOperationException("not implemented"); + doubleArrayNames.add(charSequence); + doubleArrayValues.add(doubles); + return this; } @Override diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index f8fefc4..11ba4a2 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -577,6 +577,7 @@ private void handleArray(String name, Object value, Schema schema) { } sender.doubleArray(name, doubleArray); } else if (elementType == Schema.Type.ARRAY) { + // todo: handle multidimensional arrays onUnsupportedType(name, "Multidimensional ARRAY"); } else { onUnsupportedType(name, "ARRAY<" + elementType + ">"); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index db47d08..a5853dd 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -2067,6 +2067,7 @@ public void testFloat64ArraySupport(boolean useHttp) { connect.kafka().createTopic(topicName, 1); Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "devices"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); From e719d7e61e6b5f682f06d54c574f37e565ceb26d Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 29 Jul 2025 18:49:12 +0200 Subject: [PATCH 6/9] adjust auth test for conf strings --- .../QuestDBSinkConnectorEmbeddedAuthTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java index 898e9e8..2584f5f 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java @@ -46,7 +46,7 @@ public class QuestDBSinkConnectorEmbeddedAuthTest { private static final GenericContainer tlsProxy = newTlsProxyContainer(); private static GenericContainer newQuestDbConnector() { - FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3.3"); + FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>("questdb/questdb:9.0.1"); container.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT); container.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT); container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*")); @@ -96,15 +96,15 @@ public void tearDown() { public void testSmoke(boolean useTls) { connect.kafka().createTopic(topicName, 1); Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, false); - props.put(QuestDBSinkConnectorConfig.USERNAME, TEST_USER_NAME); - props.put(QuestDBSinkConnectorConfig.TOKEN, TEST_USER_TOKEN); - + String confString; if (useTls) { - props.put(QuestDBSinkConnectorConfig.TLS, "true"); - props.put(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure"); - // override the host to point to the TLS proxy - props.put("host", "localhost:" + tlsProxy.getMappedPort(443)); + confString = "tcps::addr=localhost:" + tlsProxy.getMappedPort(443) + ";protocol_version=2;tls_verify=unsafe_off;"; + } else { + confString = "tcp::addr=" + questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT) + ";protocol_version=2;"; } + confString += "username=" + TEST_USER_NAME + ";token=" + TEST_USER_TOKEN + ";"; + // override the original confString with the one that has auth info + props.put("client.conf.string", confString); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); From 9c0a99a022b366f9202c716f709563decdbac96b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 4 Aug 2025 11:50:25 +0200 Subject: [PATCH 7/9] support for 2D and 3D arrays --- .../io/questdb/kafka/QuestDBSinkTask.java | 144 ++++++++++++++++- .../QuestDBSinkConnectorEmbeddedTest.java | 149 ++++++++++++++++++ 2 files changed, 289 insertions(+), 4 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 11ba4a2..a33d7fb 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -566,7 +566,7 @@ private void handleArray(String name, Object value, Schema schema) { if (elementType == Schema.Type.FLOAT32 || elementType == Schema.Type.FLOAT64) { List list = (List) value; - // todo: do not allocate new arrays + // todo: do not allocate new arrays, depends on https://github.com/questdb/questdb/pull/5996 double[] doubleArray = new double[list.size()]; for (int i = 0; i < list.size(); i++) { Object element = list.get(i); @@ -577,8 +577,61 @@ private void handleArray(String name, Object value, Schema schema) { } sender.doubleArray(name, doubleArray); } else if (elementType == Schema.Type.ARRAY) { - // todo: handle multidimensional arrays - onUnsupportedType(name, "Multidimensional ARRAY"); + Schema nestedValueSchema = valueSchema.valueSchema(); + if (nestedValueSchema != null && (nestedValueSchema.type() == Schema.Type.FLOAT32 || nestedValueSchema.type() == Schema.Type.FLOAT64)) { + List list = (List) value; + double[][] doubleArray2D = new double[list.size()][]; + for (int i = 0; i < list.size(); i++) { + Object row = list.get(i); + if (row == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + List rowList = (List) row; + doubleArray2D[i] = new double[rowList.size()]; + for (int j = 0; j < rowList.size(); j++) { + Object element = rowList.get(j); + if (element == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + doubleArray2D[i][j] = ((Number) element).doubleValue(); + } + } + sender.doubleArray(name, doubleArray2D); + } else if (nestedValueSchema != null && nestedValueSchema.type() == Schema.Type.ARRAY) { + Schema nestedNestedValueSchema = nestedValueSchema.valueSchema(); + if (nestedNestedValueSchema != null && (nestedNestedValueSchema.type() == Schema.Type.FLOAT32 || nestedNestedValueSchema.type() == Schema.Type.FLOAT64)) { + List list = (List) value; + double[][][] doubleArray3D = new double[list.size()][][]; + for (int i = 0; i < list.size(); i++) { + Object matrix = list.get(i); + if (matrix == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + List matrixList = (List) matrix; + doubleArray3D[i] = new double[matrixList.size()][]; + for (int j = 0; j < matrixList.size(); j++) { + Object row = matrixList.get(j); + if (row == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + List rowList = (List) row; + doubleArray3D[i][j] = new double[rowList.size()]; + for (int k = 0; k < rowList.size(); k++) { + Object element = rowList.get(k); + if (element == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + doubleArray3D[i][j][k] = ((Number) element).doubleValue(); + } + } + } + sender.doubleArray(name, doubleArray3D); + } else { + onUnsupportedType(name, "Multidimensional ARRAY with unsupported element type"); + } + } else { + onUnsupportedType(name, "Multidimensional ARRAY with unsupported element type"); + } } else { onUnsupportedType(name, "ARRAY<" + elementType + ">"); } @@ -609,7 +662,90 @@ private void handleArrayWithoutSchema(String name, List list) { } sender.doubleArray(name, doubleArray); } else if (firstElement instanceof List) { - onUnsupportedType(name, "Multidimensional ARRAY"); + List firstList = (List) firstElement; + if (firstList.isEmpty()) { + throw new InvalidDataException("QuestDB 2D array cannot contain empty rows"); + } + Object firstNestedElement = firstList.get(0); + if (firstNestedElement == null) { + throw new InvalidDataException("QuestDB 2D array elements cannot be null"); + } + + if (firstNestedElement instanceof Number) { + double[][] doubleArray2D = new double[list.size()][]; + for (int i = 0; i < list.size(); i++) { + Object row = list.get(i); + if (row == null) { + throw new InvalidDataException("QuestDB 2D array rows cannot be null"); + } + if (!(row instanceof List)) { + throw new InvalidDataException("QuestDB 2D array rows must be Lists"); + } + List rowList = (List) row; + doubleArray2D[i] = new double[rowList.size()]; + for (int j = 0; j < rowList.size(); j++) { + Object element = rowList.get(j); + if (element == null) { + throw new InvalidDataException("QuestDB 2D array elements cannot be null"); + } + if (!(element instanceof Number)) { + throw new InvalidDataException("QuestDB 2D array elements must be Numbers"); + } + doubleArray2D[i][j] = ((Number) element).doubleValue(); + } + } + sender.doubleArray(name, doubleArray2D); + } else if (firstNestedElement instanceof List) { + List firstNestedList = (List) firstNestedElement; + if (firstNestedList.isEmpty()) { + throw new InvalidDataException("QuestDB 3D array cannot contain empty matrices"); + } + Object firstNestedNestedElement = firstNestedList.get(0); + if (firstNestedNestedElement == null) { + throw new InvalidDataException("QuestDB 3D array elements cannot be null"); + } + + if (firstNestedNestedElement instanceof Number) { + double[][][] doubleArray3D = new double[list.size()][][]; + for (int i = 0; i < list.size(); i++) { + Object matrix = list.get(i); + if (matrix == null) { + throw new InvalidDataException("QuestDB 3D array matrices cannot be null"); + } + if (!(matrix instanceof List)) { + throw new InvalidDataException("QuestDB 3D array matrices must be Lists"); + } + List matrixList = (List) matrix; + doubleArray3D[i] = new double[matrixList.size()][]; + for (int j = 0; j < matrixList.size(); j++) { + Object row = matrixList.get(j); + if (row == null) { + throw new InvalidDataException("QuestDB 3D array rows cannot be null"); + } + if (!(row instanceof List)) { + throw new InvalidDataException("QuestDB 3D array rows must be Lists"); + } + List rowList = (List) row; + doubleArray3D[i][j] = new double[rowList.size()]; + for (int k = 0; k < rowList.size(); k++) { + Object element = rowList.get(k); + if (element == null) { + throw new InvalidDataException("QuestDB 3D array elements cannot be null"); + } + if (!(element instanceof Number)) { + throw new InvalidDataException("QuestDB 3D array elements must be Numbers"); + } + doubleArray3D[i][j][k] = ((Number) element).doubleValue(); + } + } + } + sender.doubleArray(name, doubleArray3D); + } else { + onUnsupportedType(name, "3D ARRAY with unsupported element type: " + firstNestedNestedElement.getClass().getSimpleName()); + } + } else { + onUnsupportedType(name, "2D ARRAY with unsupported element type: " + firstNestedElement.getClass().getSimpleName()); + } } else { onUnsupportedType(name, "ARRAY<" + firstElement.getClass().getSimpleName() + ">"); } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index a5853dd..47b2bcb 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -2250,4 +2250,153 @@ public void testArrayWithSkipUnsupportedTypes() { httpPort ); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test2DDoubleArraySupport(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with 2D double array + Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); + Schema arraySchema = SchemaBuilder.array(innerArraySchema).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Matrix") + .field("matrix_id", Schema.STRING_SCHEMA) + .field("data", arraySchema) + .build(); + + // Create 2D array data: [[1.0, 2.0], [3.0, 4.0]] + Struct struct = new Struct(schema) + .put("matrix_id", "matrix1") + .put("data", Arrays.asList( + Arrays.asList(1.0, 2.0), + Arrays.asList(3.0, 4.0) + )); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"matrix_id\",\"data\"\r\n" + + "\"matrix1\",\"[[1.0,2.0],[3.0,4.0]]\"\r\n", + "select matrix_id, data from " + topicName, + httpPort + ); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test3DDoubleArraySupport(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with 3D double array + Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); + Schema middleArraySchema = SchemaBuilder.array(innerArraySchema).build(); + Schema arraySchema = SchemaBuilder.array(middleArraySchema).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.Tensor") + .field("tensor_id", Schema.STRING_SCHEMA) + .field("data", arraySchema) + .build(); + + // Create 3D array data: [[[1.0, 2.0]], [[3.0, 4.0]]] + Struct struct = new Struct(schema) + .put("tensor_id", "tensor1") + .put("data", Arrays.asList( + Arrays.asList(Arrays.asList(1.0, 2.0)), + Arrays.asList(Arrays.asList(3.0, 4.0)) + )); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"tensor_id\",\"data\"\r\n" + + "\"tensor1\",\"[[[1.0,2.0]],[[3.0,4.0]]]\"\r\n", + "select tensor_id, data from " + topicName, + httpPort + ); + } + + @Test + public void testSchemaless2DArraySupport() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Send JSON with 2D array + String json = "{\"experiment\":\"test1\",\"results\":[[1.5,2.5],[3.5,4.5]]}"; + connect.kafka().produce(topicName, json); + + QuestDBUtils.assertSqlEventually( + "\"experiment\",\"results\"\r\n" + + "\"test1\",\"[[1.5,2.5],[3.5,4.5]]\"\r\n", + "select experiment, results from " + topicName, + httpPort + ); + } + + @Test + public void testSchemaless3DArraySupport() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Send JSON with 3D array + String json = "{\"model\":\"cnn1\",\"weights\":[[[0.1,0.2]],[[0.3,0.4]]]}"; + connect.kafka().produce(topicName, json); + + QuestDBUtils.assertSqlEventually( + "\"model\",\"weights\"\r\n" + + "\"cnn1\",\"[[[0.1,0.2]],[[0.3,0.4]]]\"\r\n", + "select model, weights from " + topicName, + httpPort + ); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test2DFloatArraySupport(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with 2D float array + Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT32_SCHEMA).build(); + Schema arraySchema = SchemaBuilder.array(innerArraySchema).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.FloatMatrix") + .field("id", Schema.STRING_SCHEMA) + .field("values", arraySchema) + .build(); + + // Create 2D array data with float values + Struct struct = new Struct(schema) + .put("id", "float_matrix1") + .put("values", Arrays.asList( + Arrays.asList(1.1f, 2.2f), + Arrays.asList(3.3f, 4.4f) + )); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"id\",\"values\"\r\n" + + "\"float_matrix1\",\"[[1.100000023841858,2.200000047683716],[3.299999952316284,4.400000095367432]]\"\r\n", + "select id, \"values\" from " + topicName, + httpPort + ); + } } From 491c40ca7e998e25f60048e8e82b6ad518b1554a Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 4 Aug 2025 14:30:01 +0200 Subject: [PATCH 8/9] support for 2D and 3D array in BufferedSender --- .../io/questdb/kafka/BufferingSender.java | 34 ++++++- .../io/questdb/kafka/QuestDBSinkTask.java | 91 ++++++++++++++++--- .../QuestDBSinkConnectorEmbeddedTest.java | 91 +++++++++++++++++++ 3 files changed, 202 insertions(+), 14 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/BufferingSender.java b/connector/src/main/java/io/questdb/kafka/BufferingSender.java index 52a6138..d3ebcbc 100644 --- a/connector/src/main/java/io/questdb/kafka/BufferingSender.java +++ b/connector/src/main/java/io/questdb/kafka/BufferingSender.java @@ -39,6 +39,10 @@ final class BufferingSender implements Sender { private final Set symbolColumns = new HashSet<>(); private final List doubleArrayNames = new ArrayList<>(DEFAULT_CAPACITY); private final List doubleArrayValues = new ArrayList<>(DEFAULT_CAPACITY); + private final List doubleArray2DNames = new ArrayList<>(DEFAULT_CAPACITY); + private final List doubleArray2DValues = new ArrayList<>(DEFAULT_CAPACITY); + private final List doubleArray3DNames = new ArrayList<>(DEFAULT_CAPACITY); + private final List doubleArray3DValues = new ArrayList<>(DEFAULT_CAPACITY); BufferingSender(Sender sender, String symbolColumns) { this.sender = sender; @@ -121,6 +125,12 @@ public void cancelRow() { boolValues.clear(); timestampNames.clear(); timestampValues.clear(); + doubleArrayNames.clear(); + doubleArrayValues.clear(); + doubleArray2DNames.clear(); + doubleArray2DValues.clear(); + doubleArray3DNames.clear(); + doubleArray3DValues.clear(); sender.cancelRow(); } @@ -211,6 +221,22 @@ private void transferFields() { } doubleArrayNames.clear(); doubleArrayValues.clear(); + + for (int i = 0, n = doubleArray2DNames.size(); i < n; i++) { + CharSequence fieldName = doubleArray2DNames.get(i); + double[][] fieldValue = doubleArray2DValues.get(i); + sender.doubleArray(fieldName, fieldValue); + } + doubleArray2DNames.clear(); + doubleArray2DValues.clear(); + + for (int i = 0, n = doubleArray3DNames.size(); i < n; i++) { + CharSequence fieldName = doubleArray3DNames.get(i); + double[][][] fieldValue = doubleArray3DValues.get(i); + sender.doubleArray(fieldName, fieldValue); + } + doubleArray3DNames.clear(); + doubleArray3DValues.clear(); } private static long unitToMicros(long value, ChronoUnit unit) { @@ -258,12 +284,16 @@ public Sender doubleArray(CharSequence charSequence, double[] doubles) { @Override public Sender doubleArray(CharSequence charSequence, double[][] doubles) { - throw new UnsupportedOperationException("not implemented"); + doubleArray2DNames.add(charSequence); + doubleArray2DValues.add(doubles); + return this; } @Override public Sender doubleArray(CharSequence charSequence, double[][][] doubles) { - throw new UnsupportedOperationException("not implemented"); + doubleArray3DNames.add(charSequence); + doubleArray3DValues.add(doubles); + return this; } @Override diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index a33d7fb..5f9cd1f 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -580,12 +580,25 @@ private void handleArray(String name, Object value, Schema schema) { Schema nestedValueSchema = valueSchema.valueSchema(); if (nestedValueSchema != null && (nestedValueSchema.type() == Schema.Type.FLOAT32 || nestedValueSchema.type() == Schema.Type.FLOAT64)) { List list = (List) value; + + // First, validate that all rows have the same length (no jagged arrays) + if (!list.isEmpty()) { + int expectedRowLength = ((List) list.get(0)).size(); + for (int i = 0; i < list.size(); i++) { + Object row = list.get(i); + if (row == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + List rowList = (List) row; + if (rowList.size() != expectedRowLength) { + throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but row " + i + " has length: " + rowList.size()); + } + } + } + double[][] doubleArray2D = new double[list.size()][]; for (int i = 0; i < list.size(); i++) { Object row = list.get(i); - if (row == null) { - throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); - } List rowList = (List) row; doubleArray2D[i] = new double[rowList.size()]; for (int j = 0; j < rowList.size(); j++) { @@ -601,19 +614,43 @@ private void handleArray(String name, Object value, Schema schema) { Schema nestedNestedValueSchema = nestedValueSchema.valueSchema(); if (nestedNestedValueSchema != null && (nestedNestedValueSchema.type() == Schema.Type.FLOAT32 || nestedNestedValueSchema.type() == Schema.Type.FLOAT64)) { List list = (List) value; + + // First, validate dimensions for 3D array (no jagged arrays) + if (!list.isEmpty()) { + List firstMatrix = (List) list.get(0); + int expectedMatrixHeight = firstMatrix.size(); + int expectedRowLength = firstMatrix.isEmpty() ? 0 : ((List) firstMatrix.get(0)).size(); + + for (int i = 0; i < list.size(); i++) { + Object matrix = list.get(i); + if (matrix == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + List matrixList = (List) matrix; + if (matrixList.size() != expectedMatrixHeight) { + throw new InvalidDataException("QuestDB does not support jagged arrays. All matrices must have the same height. Expected: " + expectedMatrixHeight + ", but matrix " + i + " has height: " + matrixList.size()); + } + + for (int j = 0; j < matrixList.size(); j++) { + Object row = matrixList.get(j); + if (row == null) { + throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); + } + List rowList = (List) row; + if (rowList.size() != expectedRowLength) { + throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but matrix " + i + " row " + j + " has length: " + rowList.size()); + } + } + } + } + double[][][] doubleArray3D = new double[list.size()][][]; for (int i = 0; i < list.size(); i++) { Object matrix = list.get(i); - if (matrix == null) { - throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); - } List matrixList = (List) matrix; doubleArray3D[i] = new double[matrixList.size()][]; for (int j = 0; j < matrixList.size(); j++) { Object row = matrixList.get(j); - if (row == null) { - throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays"); - } List rowList = (List) row; doubleArray3D[i][j] = new double[rowList.size()]; for (int k = 0; k < rowList.size(); k++) { @@ -672,7 +709,8 @@ private void handleArrayWithoutSchema(String name, List list) { } if (firstNestedElement instanceof Number) { - double[][] doubleArray2D = new double[list.size()][]; + // First, validate that all rows have the same length (no jagged arrays) + int expectedRowLength = firstList.size(); for (int i = 0; i < list.size(); i++) { Object row = list.get(i); if (row == null) { @@ -682,6 +720,15 @@ private void handleArrayWithoutSchema(String name, List list) { throw new InvalidDataException("QuestDB 2D array rows must be Lists"); } List rowList = (List) row; + if (rowList.size() != expectedRowLength) { + throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but row " + i + " has length: " + rowList.size()); + } + } + + double[][] doubleArray2D = new double[list.size()][]; + for (int i = 0; i < list.size(); i++) { + Object row = list.get(i); + List rowList = (List) row; doubleArray2D[i] = new double[rowList.size()]; for (int j = 0; j < rowList.size(); j++) { Object element = rowList.get(j); @@ -706,7 +753,10 @@ private void handleArrayWithoutSchema(String name, List list) { } if (firstNestedNestedElement instanceof Number) { - double[][][] doubleArray3D = new double[list.size()][][]; + // First, validate dimensions for 3D array (no jagged arrays) + int expectedMatrixHeight = firstNestedList.size(); + int expectedRowLength = firstNestedList.size() > 0 ? ((List) firstNestedList.get(0)).size() : 0; + for (int i = 0; i < list.size(); i++) { Object matrix = list.get(i); if (matrix == null) { @@ -716,7 +766,10 @@ private void handleArrayWithoutSchema(String name, List list) { throw new InvalidDataException("QuestDB 3D array matrices must be Lists"); } List matrixList = (List) matrix; - doubleArray3D[i] = new double[matrixList.size()][]; + if (matrixList.size() != expectedMatrixHeight) { + throw new InvalidDataException("QuestDB does not support jagged arrays. All matrices must have the same height. Expected: " + expectedMatrixHeight + ", but matrix " + i + " has height: " + matrixList.size()); + } + for (int j = 0; j < matrixList.size(); j++) { Object row = matrixList.get(j); if (row == null) { @@ -726,6 +779,20 @@ private void handleArrayWithoutSchema(String name, List list) { throw new InvalidDataException("QuestDB 3D array rows must be Lists"); } List rowList = (List) row; + if (rowList.size() != expectedRowLength) { + throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but matrix " + i + " row " + j + " has length: " + rowList.size()); + } + } + } + + double[][][] doubleArray3D = new double[list.size()][][]; + for (int i = 0; i < list.size(); i++) { + Object matrix = list.get(i); + List matrixList = (List) matrix; + doubleArray3D[i] = new double[matrixList.size()][]; + for (int j = 0; j < matrixList.size(); j++) { + Object row = matrixList.get(j); + List rowList = (List) row; doubleArray3D[i][j] = new double[rowList.size()]; for (int k = 0; k < rowList.size(); k++) { Object element = rowList.get(k); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 47b2bcb..0606670 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -2399,4 +2399,95 @@ public void test2DFloatArraySupport(boolean useHttp) { httpPort ); } + + @Test + public void testJaggedArrayRejection() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + props.put("errors.tolerance", "none"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // send JSON with jagged 2D array (different row lengths) + String json = "{\"experiment\":\"jagged\",\"results\":[[1.5,2.5,3.5],[4.5,5.5]]}"; + connect.kafka().produce(topicName, json); + + ConnectTestUtils.assertConnectorTaskFailedEventually(connect); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test2DArrayWithSymbolColumns(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "sensor_id"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with 2D double array and symbol column + Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); + Schema arraySchema = SchemaBuilder.array(innerArraySchema).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.MatrixWithSymbol") + .field("sensor_id", Schema.STRING_SCHEMA) + .field("readings", arraySchema) + .build(); + + // Create 2D array data: [[1.1, 2.2], [3.3, 4.4]] + Struct struct = new Struct(schema) + .put("sensor_id", "sensor-001") + .put("readings", Arrays.asList( + Arrays.asList(1.1, 2.2), + Arrays.asList(3.3, 4.4) + )); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"sensor_id\",\"readings\"\r\n" + + "\"sensor-001\",\"[[1.1,2.2],[3.3,4.4]]\"\r\n", + "select sensor_id, readings from " + topicName, + httpPort + ); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test3DArrayWithSymbolColumns(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "device_id"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // Create schema with 3D double array and symbol column + Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); + Schema middleArraySchema = SchemaBuilder.array(innerArraySchema).build(); + Schema arraySchema = SchemaBuilder.array(middleArraySchema).build(); + Schema schema = SchemaBuilder.struct() + .name("com.example.TensorWithSymbol") + .field("device_id", Schema.STRING_SCHEMA) + .field("data", arraySchema) + .build(); + + // Create 3D array data: [[[1.0, 2.0]], [[3.0, 4.0]]] + Struct struct = new Struct(schema) + .put("device_id", "device-alpha") + .put("data", Arrays.asList( + Arrays.asList(Arrays.asList(1.0, 2.0)), + Arrays.asList(Arrays.asList(3.0, 4.0)) + )); + + connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct))); + + QuestDBUtils.assertSqlEventually( + "\"device_id\",\"data\"\r\n" + + "\"device-alpha\",\"[[[1.0,2.0]],[[3.0,4.0]]]\"\r\n", + "select device_id, data from " + topicName, + httpPort + ); + } } From 1533491867c131cafdac13b44239438a4408030e Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 4 Aug 2025 15:13:44 +0200 Subject: [PATCH 9/9] regression fixed --- connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 5f9cd1f..334e987 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -754,8 +754,8 @@ private void handleArrayWithoutSchema(String name, List list) { if (firstNestedNestedElement instanceof Number) { // First, validate dimensions for 3D array (no jagged arrays) - int expectedMatrixHeight = firstNestedList.size(); - int expectedRowLength = firstNestedList.size() > 0 ? ((List) firstNestedList.get(0)).size() : 0; + int expectedMatrixHeight = firstList.size(); + int expectedRowLength = firstNestedList.size(); for (int i = 0; i < list.size(); i++) { Object matrix = list.get(i);