Skip to content

Commit ba01140

Browse files
committed
improve handling of Debezium timestamps
1 parent c8a77f1 commit ba01140

File tree

2 files changed

+135
-15
lines changed

2 files changed

+135
-15
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public final class QuestDBSinkTask extends SinkTask {
3030
private Sender sender;
3131
private QuestDBSinkConnectorConfig config;
3232
private String timestampColumnName;
33-
private Object timestampColumnValue;
33+
private long timestampColumnValue = Long.MIN_VALUE;
3434

3535
@Override
3636
public String version() {
@@ -53,7 +53,7 @@ public void put(Collection<SinkRecord> collection) {
5353
}
5454

5555
private void handleSingleRecord(SinkRecord record) {
56-
assert timestampColumnValue == null;
56+
assert timestampColumnValue == Long.MIN_VALUE;
5757
String explicitTable = config.getTable();
5858
String tableName = explicitTable == null ? record.topic() : explicitTable;
5959
sender.table(tableName);
@@ -64,17 +64,11 @@ private void handleSingleRecord(SinkRecord record) {
6464
}
6565
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
6666

67-
if (timestampColumnValue == null) {
67+
if (timestampColumnValue == Long.MIN_VALUE) {
6868
sender.atNow();
6969
} else {
70-
if (timestampColumnValue instanceof Long) {
71-
sender.at(TimeUnit.MILLISECONDS.toNanos((Long) timestampColumnValue));
72-
} else if (timestampColumnValue instanceof java.util.Date) {
73-
sender.at(TimeUnit.MILLISECONDS.toNanos(((java.util.Date) timestampColumnValue).getTime()));
74-
} else {
75-
throw new ConnectException("Unsupported timestamp column type: " + timestampColumnValue.getClass());
76-
}
77-
timestampColumnValue = null;
70+
sender.at(timestampColumnValue);
71+
timestampColumnValue = Long.MIN_VALUE;
7872
}
7973
}
8074

@@ -106,11 +100,11 @@ private boolean isDesignatedColumnName(String name, String fallbackName) {
106100
private void handleObject(String name, Schema schema, Object value, String fallbackName) {
107101
assert !name.isEmpty() || !fallbackName.isEmpty();
108102
if (isDesignatedColumnName(name, fallbackName)) {
109-
assert timestampColumnValue == null;
103+
assert timestampColumnValue == Long.MIN_VALUE;
110104
if (value == null) {
111105
throw new ConnectException("Timestamp column value cannot be null");
112106
}
113-
timestampColumnValue = value;
107+
timestampColumnValue = resolveDesignatedTimestampColumnValue(value, schema);
114108
return;
115109
}
116110
if (value == null) {
@@ -119,13 +113,35 @@ private void handleObject(String name, Schema schema, Object value, String fallb
119113
if (tryWriteLogicalType(name.isEmpty() ? fallbackName : name, schema, value)) {
120114
return;
121115
}
122-
// ok, not a known logical try, try primitive types
116+
// ok, not a known logical type, try primitive types
123117
if (tryWritePhysicalTypeFromSchema(name, schema, value, fallbackName)) {
124118
return;
125119
}
126120
writePhysicalTypeWithoutSchema(name, value, fallbackName);
127121
}
128122

123+
private static long resolveDesignatedTimestampColumnValue(Object value, Schema schema) {
124+
if (value instanceof java.util.Date) {
125+
return TimeUnit.MILLISECONDS.toNanos(((java.util.Date) value).getTime());
126+
}
127+
if (!(value instanceof Long)) {
128+
throw new ConnectException("Unsupported timestamp column type: " + value.getClass());
129+
}
130+
long longValue = (Long) value;
131+
TimeUnit inputUnit;
132+
if (schema == null || schema.name() == null) {
133+
// no schema, assuming millis since epoch
134+
inputUnit = TimeUnit.MILLISECONDS;
135+
} else if ("io.debezium.time.MicroTimestamp".equals(schema.name())) {
136+
// special case: Debezium micros since epoch
137+
inputUnit = TimeUnit.MICROSECONDS;
138+
} else {
139+
// no idea what's that, let's assume it's again millis since epoch and hope for the best
140+
inputUnit = TimeUnit.MILLISECONDS;
141+
}
142+
return inputUnit.toNanos(longValue);
143+
}
144+
129145
private void writePhysicalTypeWithoutSchema(String name, Object value, String fallbackName) {
130146
if (value == null) {
131147
return;
@@ -214,6 +230,10 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
214230
return false;
215231
}
216232
switch (schema.name()) {
233+
case "io.debezium.time.MicroTimestamp":
234+
long l = (Long) value;
235+
sender.timestampColumn(name, l);
236+
return true;
217237
case Timestamp.LOGICAL_NAME:
218238
case Date.LOGICAL_NAME:
219239
java.util.Date d = (java.util.Date) value;

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.sql.Connection;
2929
import java.sql.DriverManager;
30+
import java.sql.ResultSet;
3031
import java.sql.SQLException;
3132
import java.sql.Statement;
3233
import java.util.stream.Stream;
@@ -65,14 +66,15 @@ public class DebeziumIT {
6566
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb-connector.jar")
6667
.withCopyFileToContainer(MountableFile.forHostPath(questdbJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb.jar")
6768
.dependsOn(kafkaContainer)
69+
// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("debezium")))
6870
.withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "true")
6971
.withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true");
7072

7173
@Container
7274
private static final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3")
7375
.withNetwork(network)
7476
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
75-
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
77+
// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
7678
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
7779
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
7880

@@ -160,6 +162,104 @@ public void testSchemaChange() throws Exception {
160162
}
161163
}
162164

165+
@Test
166+
public void testUpdatesChange() throws Exception {
167+
String questTableName = "test_updates_change";
168+
try (Connection connection = getConnection(postgresContainer);
169+
Statement statement = connection.createStatement()) {
170+
171+
statement.execute("create schema " + PG_SCHEMA_NAME);
172+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");
173+
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");
174+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");
175+
176+
startDebeziumConnector();
177+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
178+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
179+
180+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
181+
+ "1,\"Learn CDC\"\r\n",
182+
"select id, title from " + questTableName);
183+
184+
statement.executeUpdate("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set title = 'Learn Debezium' where id = 1");
185+
186+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
187+
+ "1,\"Learn CDC\"\r\n"
188+
+ "1,\"Learn Debezium\"\r\n",
189+
"select id, title from " + questTableName);
190+
}
191+
}
192+
193+
@Test
194+
public void testInsertThenDeleteThenInsertAgain() throws Exception {
195+
String questTableName = "test_insert_then_delete_then_insert_again";
196+
try (Connection connection = getConnection(postgresContainer);
197+
Statement statement = connection.createStatement()) {
198+
199+
statement.execute("create schema " + PG_SCHEMA_NAME);
200+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");
201+
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");
202+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");
203+
204+
startDebeziumConnector();
205+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
206+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
207+
208+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
209+
+ "1,\"Learn CDC\"\r\n",
210+
"select id, title from " + questTableName);
211+
212+
statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1");
213+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn Debezium')");
214+
215+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
216+
+ "1,\"Learn CDC\"\r\n"
217+
+ "1,\"Learn Debezium\"\r\n",
218+
"select id, title from " + questTableName);
219+
}
220+
}
221+
222+
@Test
223+
public void testEventTime() throws SQLException {
224+
String questTableName = "test_event_time";
225+
try (Connection connection = getConnection(postgresContainer);
226+
Statement statement = connection.createStatement()) {
227+
228+
statement.execute("create schema " + PG_SCHEMA_NAME);
229+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp, primary key (id))");
230+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.456Z')");
231+
232+
startDebeziumConnector();
233+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
234+
questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");
235+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
236+
237+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"
238+
+ "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n",
239+
"select id, title, timestamp from " + questTableName);
240+
}
241+
}
242+
243+
@Test
244+
public void testNonDesignatedTimestamp() throws SQLException {
245+
String questTableName = "test_non_designated_timestamp";
246+
try (Connection connection = getConnection(postgresContainer);
247+
Statement statement = connection.createStatement()) {
248+
249+
statement.execute("create schema " + PG_SCHEMA_NAME);
250+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp, primary key (id))");
251+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.456Z')");
252+
253+
startDebeziumConnector();
254+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
255+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
256+
257+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n"
258+
+ "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n",
259+
"select id, title, created_at from " + questTableName);
260+
}
261+
}
262+
163263
private static void startDebeziumConnector() {
164264
ConnectorConfiguration connector = ConnectorConfiguration
165265
.forJdbcContainer(postgresContainer)

0 commit comments

Comments
 (0)