Skip to content

Commit 3f09d36

Browse files
committed
better handling of debezium temporal types
1 parent ba01140 commit 3f09d36

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.questdb.kafka;
22

33
import io.questdb.client.Sender;
4+
import io.questdb.std.datetime.microtime.Timestamps;
45
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
56
import org.apache.kafka.common.TopicPartition;
67
import org.apache.kafka.connect.data.Date;
@@ -234,6 +235,11 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
234235
long l = (Long) value;
235236
sender.timestampColumn(name, l);
236237
return true;
238+
case "io.debezium.time.Date":
239+
int i = (Integer) value;
240+
long micros = Timestamps.addDays(0, i);
241+
sender.timestampColumn(name, micros);
242+
return true;
237243
case Timestamp.LOGICAL_NAME:
238244
case Date.LOGICAL_NAME:
239245
java.util.Date d = (java.util.Date) value;

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,48 @@ public void testEventTime() throws SQLException {
240240
}
241241
}
242242

243+
@Test
244+
public void testEventTimeMicros() throws SQLException {
245+
String questTableName = "test_event_time_micros";
246+
try (Connection connection = getConnection(postgresContainer);
247+
Statement statement = connection.createStatement()) {
248+
249+
statement.execute("create schema " + PG_SCHEMA_NAME);
250+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp(6), primary key (id))");
251+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.123456Z')");
252+
253+
startDebeziumConnector();
254+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
255+
questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");
256+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
257+
258+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"
259+
+ "1,\"Learn CDC\",\"2021-01-02T01:02:03.123456Z\"\r\n",
260+
"select id, title, timestamp from " + questTableName);
261+
}
262+
}
263+
264+
@Test
265+
public void testEventTimeNanos() throws SQLException {
266+
String questTableName = "test_event_time_nanos";
267+
try (Connection connection = getConnection(postgresContainer);
268+
Statement statement = connection.createStatement()) {
269+
270+
statement.execute("create schema " + PG_SCHEMA_NAME);
271+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp(9), primary key (id))");
272+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.123456789Z')");
273+
274+
startDebeziumConnector();
275+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
276+
questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");
277+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
278+
279+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"
280+
+ "1,\"Learn CDC\",\"2021-01-02T01:02:03.123457Z\"\r\n",
281+
"select id, title, timestamp from " + questTableName);
282+
}
283+
}
284+
243285
@Test
244286
public void testNonDesignatedTimestamp() throws SQLException {
245287
String questTableName = "test_non_designated_timestamp";
@@ -260,6 +302,26 @@ public void testNonDesignatedTimestamp() throws SQLException {
260302
}
261303
}
262304

305+
@Test
306+
public void testDate() throws SQLException {
307+
String questTableName = "test_date";
308+
try (Connection connection = getConnection(postgresContainer);
309+
Statement statement = connection.createStatement()) {
310+
311+
statement.execute("create schema " + PG_SCHEMA_NAME);
312+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at date, primary key (id))");
313+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02')");
314+
315+
startDebeziumConnector();
316+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
317+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
318+
319+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n"
320+
+ "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n",
321+
"select id, title, created_at from " + questTableName);
322+
}
323+
}
324+
263325
private static void startDebeziumConnector() {
264326
ConnectorConfiguration connector = ConnectorConfiguration
265327
.forJdbcContainer(postgresContainer)

0 commit comments

Comments
 (0)