From 620a3319fe6222d76b33d1b6428b8df2ecaf6cab Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 29 Oct 2025 15:17:08 +0100 Subject: [PATCH 1/2] more logging --- connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 334e987..3d46254 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -277,6 +277,7 @@ private void onHttpSenderException(Exception e) { ) { // ok, we have a parsing error, let's try to send records one by one to find the problematic record // and we will report it to the error handler. the rest of the records will make it to QuestDB + log.debug("Sender exception, trying to send problematic record one by one. Inflight record size = {}", inflightSinkRecords.size(), e); sender = createSender(); for (int i = 0; i < inflightSinkRecords.size(); i++) { SinkRecord sinkRecord = inflightSinkRecords.get(i); @@ -284,6 +285,7 @@ private void onHttpSenderException(Exception e) { handleSingleRecord(sinkRecord); sender.flush(); } catch (Exception ex) { + log.debug("Failed to send problematic record to QuestDB. Reporting to Kafka Connect error handler (DQL)...", ex); context.errantRecordReporter().report(sinkRecord, ex); closeSenderSilently(); sender = createSender(); From 65f51fbd153e52ffd40bbc82306b786a7db5e42b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 29 Oct 2025 15:46:28 +0100 Subject: [PATCH 2/2] feat: a config flag to send a whole failed batch to DLQ --- .../kafka/QuestDBSinkConnectorConfig.java | 13 +++++- .../io/questdb/kafka/QuestDBSinkTask.java | 40 +++++++++++------- .../QuestDBSinkConnectorEmbeddedTest.java | 42 +++++++++++++++++++ 3 files changed, 80 insertions(+), 15 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 44f21ec..0c80587 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -79,6 +79,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TIMESTAMP_FORMAT = "timestamp.string.format"; private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields"; + public static final String DLQ_SEND_BATCH_ON_ERROR_CONFIG = "dlq.send.batch.on.error"; + private static final String DLQ_SEND_BATCH_ON_ERROR_DOC = "When true and a Dead Letter Queue (DLQ) is configured, " + + "send the entire batch to DLQ on parsing errors instead of trying to send records one-by-one to the database first. " + + "This can be useful to avoid additional database load when errors are expected to affect multiple records. " + + "Default is false (try one-by-one)."; + private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ"; public QuestDBSinkConnectorConfig(ConfigDef config, Map parsedConfig) { @@ -111,7 +117,8 @@ public static ConfigDef conf() { .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC) .define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC) .define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC) - .define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC); + .define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC) + .define(DLQ_SEND_BATCH_ON_ERROR_CONFIG, Type.BOOLEAN, false, Importance.LOW, DLQ_SEND_BATCH_ON_ERROR_DOC); } public Password getConfigurationString() { @@ -212,6 +219,10 @@ public int getMaxRetries() { return getInt(MAX_RETRIES); } + public boolean isDlqSendBatchOnError() { + return getBoolean(DLQ_SEND_BATCH_ON_ERROR_CONFIG); + } + private static class TimestampUnitsRecommender implements ConfigDef.Recommender { private static final TimestampUnitsRecommender INSTANCE = new TimestampUnitsRecommender(); private static final List VALID_UNITS = Arrays.asList("auto", "millis", "micros", "nanos"); diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 3d46254..1060409 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -52,6 +52,7 @@ public final class QuestDBSinkTask extends SinkTask { private final FlushConfig flushConfig = new FlushConfig(); private final ObjList inflightSinkRecords = new ObjList<>(); private ErrantRecordReporter reporter; + private boolean dlqSendBatchOnError; @Override public String version() { @@ -96,6 +97,7 @@ public void start(Map map) { // Kafka older than 2.6 reporter = null; } + this.dlqSendBatchOnError = config.isDlqSendBatchOnError(); } private Sender createRawSender() { @@ -275,20 +277,30 @@ private void onHttpSenderException(Exception e) { (reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side && (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol")) ) { - // ok, we have a parsing error, let's try to send records one by one to find the problematic record - // and we will report it to the error handler. the rest of the records will make it to QuestDB - log.debug("Sender exception, trying to send problematic record one by one. Inflight record size = {}", inflightSinkRecords.size(), e); - sender = createSender(); - for (int i = 0; i < inflightSinkRecords.size(); i++) { - SinkRecord sinkRecord = inflightSinkRecords.get(i); - try { - handleSingleRecord(sinkRecord); - sender.flush(); - } catch (Exception ex) { - log.debug("Failed to send problematic record to QuestDB. Reporting to Kafka Connect error handler (DQL)...", ex); - context.errantRecordReporter().report(sinkRecord, ex); - closeSenderSilently(); - sender = createSender(); + if (dlqSendBatchOnError) { + // Send all records directly to DLQ without trying to send them to database + log.warn("Sender exception, sending entire batch to DLQ. Inflight record size = {}", inflightSinkRecords.size(), e); + for (int i = 0; i < inflightSinkRecords.size(); i++) { + SinkRecord sinkRecord = inflightSinkRecords.get(i); + log.debug("Reporting record to Kafka Connect error handler (DLQ)..."); + context.errantRecordReporter().report(sinkRecord, e); + } + } else { + // ok, we have a parsing error, let's try to send records one by one to find the problematic record + // and we will report it to the error handler. the rest of the records will make it to QuestDB + log.warn("Sender exception, trying to send problematic record one by one. Inflight record size = {}", inflightSinkRecords.size(), e); + sender = createSender(); + for (int i = 0; i < inflightSinkRecords.size(); i++) { + SinkRecord sinkRecord = inflightSinkRecords.get(i); + try { + handleSingleRecord(sinkRecord); + sender.flush(); + } catch (Exception ex) { + log.warn("Failed to send problematic record to QuestDB. Reporting to Kafka Connect error handler (DQL)...", ex); + context.errantRecordReporter().report(sinkRecord, ex); + closeSenderSilently(); + sender = createSender(); + } } } nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 0606670..738e3f1 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -585,6 +585,48 @@ public void testDeadLetterQueue_badColumnType() { } + @Test + public void testDeadLetterQueue_sendBatchOnError() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + props.put("dlq.send.batch.on.error", "true"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal", + httpPort, + QuestDBUtils.Endpoint.EXEC); + + String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}"; + String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}"; + String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"; + String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + + // interleave good and bad records + connect.kafka().produce(topicName, "key", goodRecordA); + connect.kafka().produce(topicName, "key", badRecordA); + connect.kafka().produce(topicName, "key", goodRecordB); + connect.kafka().produce(topicName, "key", badRecordB); + connect.kafka().produce(topicName, "key", goodRecordC); + + // When dlq.send.batch.on.error is true, ALL records in the batch with errors should go to DLQ + // This means all 5 records should be in DLQ, not just the 2 bad ones + ConsumerRecords fetchedRecords = connect.kafka().consume(5, 120_000, "dlq"); + Assertions.assertEquals(5, fetchedRecords.count()); + + // Verify that NO records made it to QuestDB + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n", + "select firstname,lastname,age, id from " + topicName, + httpPort); + } + @Test public void testbadColumnType_noDLQ() { connect.kafka().createTopic(topicName, 1);