Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String TIMESTAMP_FORMAT = "timestamp.string.format";
private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields";

public static final String DLQ_SEND_BATCH_ON_ERROR_CONFIG = "dlq.send.batch.on.error";
private static final String DLQ_SEND_BATCH_ON_ERROR_DOC = "When true and a Dead Letter Queue (DLQ) is configured, " +
"send the entire batch to DLQ on parsing errors instead of trying to send records one-by-one to the database first. " +
"This can be useful to avoid additional database load when errors are expected to affect multiple records. " +
"Default is false (try one-by-one).";

private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ";

public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
Expand Down Expand Up @@ -111,7 +117,8 @@ public static ConfigDef conf() {
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC)
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC);
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC)
.define(DLQ_SEND_BATCH_ON_ERROR_CONFIG, Type.BOOLEAN, false, Importance.LOW, DLQ_SEND_BATCH_ON_ERROR_DOC);
}

public Password getConfigurationString() {
Expand Down Expand Up @@ -212,6 +219,10 @@ public int getMaxRetries() {
return getInt(MAX_RETRIES);
}

public boolean isDlqSendBatchOnError() {
return getBoolean(DLQ_SEND_BATCH_ON_ERROR_CONFIG);
}

private static class TimestampUnitsRecommender implements ConfigDef.Recommender {
private static final TimestampUnitsRecommender INSTANCE = new TimestampUnitsRecommender();
private static final List<Object> VALID_UNITS = Arrays.asList("auto", "millis", "micros", "nanos");
Expand Down
38 changes: 26 additions & 12 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class QuestDBSinkTask extends SinkTask {
private final FlushConfig flushConfig = new FlushConfig();
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
private ErrantRecordReporter reporter;
private boolean dlqSendBatchOnError;

@Override
public String version() {
Expand Down Expand Up @@ -96,6 +97,7 @@ public void start(Map<String, String> map) {
// Kafka older than 2.6
reporter = null;
}
this.dlqSendBatchOnError = config.isDlqSendBatchOnError();
}

private Sender createRawSender() {
Expand Down Expand Up @@ -275,18 +277,30 @@ private void onHttpSenderException(Exception e) {
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
) {
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
// and we will report it to the error handler. the rest of the records will make it to QuestDB
sender = createSender();
for (int i = 0; i < inflightSinkRecords.size(); i++) {
SinkRecord sinkRecord = inflightSinkRecords.get(i);
try {
handleSingleRecord(sinkRecord);
sender.flush();
} catch (Exception ex) {
context.errantRecordReporter().report(sinkRecord, ex);
closeSenderSilently();
sender = createSender();
if (dlqSendBatchOnError) {
// Send all records directly to DLQ without trying to send them to database
log.warn("Sender exception, sending entire batch to DLQ. Inflight record size = {}", inflightSinkRecords.size(), e);
for (int i = 0; i < inflightSinkRecords.size(); i++) {
SinkRecord sinkRecord = inflightSinkRecords.get(i);
log.debug("Reporting record to Kafka Connect error handler (DLQ)...");
context.errantRecordReporter().report(sinkRecord, e);
}
} else {
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
// and we will report it to the error handler. the rest of the records will make it to QuestDB
log.warn("Sender exception, trying to send problematic record one by one. Inflight record size = {}", inflightSinkRecords.size(), e);
sender = createSender();
for (int i = 0; i < inflightSinkRecords.size(); i++) {
SinkRecord sinkRecord = inflightSinkRecords.get(i);
try {
handleSingleRecord(sinkRecord);
sender.flush();
} catch (Exception ex) {
log.warn("Failed to send problematic record to QuestDB. Reporting to Kafka Connect error handler (DQL)...", ex);
context.errantRecordReporter().report(sinkRecord, ex);
closeSenderSilently();
sender = createSender();
}
}
}
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,48 @@ public void testDeadLetterQueue_badColumnType() {

}

@Test
public void testDeadLetterQueue_sendBatchOnError() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("value.converter.schemas.enable", "false");
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("dlq.send.batch.on.error", "true");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";

// interleave good and bad records
connect.kafka().produce(topicName, "key", goodRecordA);
connect.kafka().produce(topicName, "key", badRecordA);
connect.kafka().produce(topicName, "key", goodRecordB);
connect.kafka().produce(topicName, "key", badRecordB);
connect.kafka().produce(topicName, "key", goodRecordC);

// When dlq.send.batch.on.error is true, ALL records in the batch with errors should go to DLQ
// This means all 5 records should be in DLQ, not just the 2 bad ones
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(5, 120_000, "dlq");
Assertions.assertEquals(5, fetchedRecords.count());

// Verify that NO records made it to QuestDB
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n",
"select firstname,lastname,age, id from " + topicName,
httpPort);
}

@Test
public void testbadColumnType_noDLQ() {
connect.kafka().createTopic(topicName, 1);
Expand Down