Skip to content

Commit efeab77

Browse files
authored
Merge pull request #5 from questdb/feat/timestamp-unit-detection
[feat] timestamp unit configuration
2 parents 41b2d50 + 49fa598 commit efeab77

File tree

7 files changed

+198
-11
lines changed

7 files changed

+198
-11
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import org.apache.kafka.common.config.ConfigDef;
55
import org.apache.kafka.common.config.ConfigDef.Importance;
66
import org.apache.kafka.common.config.ConfigDef.Type;
7+
import org.apache.kafka.connect.errors.ConnectException;
78

89
import java.util.Map;
10+
import java.util.concurrent.TimeUnit;
911

1012
public final class QuestDBSinkConnectorConfig extends AbstractConfig {
1113
public static final String HOST_CONFIG = "host";
@@ -26,6 +28,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
2628
public static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.field.name";
2729
private static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC = "Designated timestamp field name";
2830

31+
public static final String TIMESTAMP_UNITS_CONFIG = "timestamp.units";
32+
private static final String TIMESTAMP_UNITS_DOC = "Units of timestamp field. Possible values: auto, millis, micros, nanos";
33+
2934
public static final String INCLUDE_KEY_CONFIG = "include.key";
3035
private static final String INCLUDE_KEY_DOC = "Include key in the table";
3136

@@ -61,7 +66,8 @@ public static ConfigDef conf() {
6166
.define(SYMBOL_COLUMNS_CONFIG, Type.STRING, null, Importance.MEDIUM, SYMBOL_COLUMNS_DOC)
6267
.define(USERNAME, Type.STRING, "admin", Importance.MEDIUM, USERNAME_DOC)
6368
.define(TOKEN, Type.PASSWORD, null, Importance.MEDIUM, TOKEN_DOC)
64-
.define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC);
69+
.define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC)
70+
.define(TIMESTAMP_UNITS_CONFIG, Type.STRING, "auto", ConfigDef.ValidString.in("auto", "millis", "micros", "nanos"), Importance.LOW, TIMESTAMP_UNITS_DOC);
6571
}
6672

6773
public String getHost() {
@@ -107,4 +113,20 @@ public String getToken() {
107113
public boolean isTls() {
108114
return getBoolean(TLS);
109115
}
116+
117+
public TimeUnit getTimestampUnitsOrNull() {
118+
String configured = getString(TIMESTAMP_UNITS_CONFIG);
119+
switch (configured) {
120+
case "millis":
121+
return TimeUnit.MILLISECONDS;
122+
case "micros":
123+
return TimeUnit.MICROSECONDS;
124+
case "nanos":
125+
return TimeUnit.NANOSECONDS;
126+
case "auto":
127+
return null;
128+
default:
129+
throw new ConnectException("Unknown timestamp units mode: " + configured + ". Possible values: auto, millis, micros, nanos");
130+
}
131+
}
110132
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public final class QuestDBSinkTask extends SinkTask {
3232
private QuestDBSinkConnectorConfig config;
3333
private String timestampColumnName;
3434
private long timestampColumnValue = Long.MIN_VALUE;
35+
private TimeUnit timestampUnits;
3536

3637
@Override
3738
public String version() {
@@ -43,6 +44,7 @@ public void start(Map<String, String> map) {
4344
this.config = new QuestDBSinkConnectorConfig(map);
4445
this.sender = createSender();
4546
this.timestampColumnName = config.getDesignatedTimestampColumnName();
47+
this.timestampUnits = config.getTimestampUnitsOrNull();
4648
}
4749

4850
private Sender createSender() {
@@ -153,7 +155,7 @@ private void handleObject(String name, Schema schema, Object value, String fallb
153155
writePhysicalTypeWithoutSchema(name, value, fallbackName);
154156
}
155157

156-
private static long resolveDesignatedTimestampColumnValue(Object value, Schema schema) {
158+
private long resolveDesignatedTimestampColumnValue(Object value, Schema schema) {
157159
if (value instanceof java.util.Date) {
158160
return TimeUnit.MILLISECONDS.toNanos(((java.util.Date) value).getTime());
159161
}
@@ -162,15 +164,11 @@ private static long resolveDesignatedTimestampColumnValue(Object value, Schema s
162164
}
163165
long longValue = (Long) value;
164166
TimeUnit inputUnit;
165-
if (schema == null || schema.name() == null) {
166-
// no schema, assuming millis since epoch
167-
inputUnit = TimeUnit.MILLISECONDS;
168-
} else if ("io.debezium.time.MicroTimestamp".equals(schema.name())) {
167+
if (schema == null || !"io.debezium.time.MicroTimestamp".equals(schema.name())) {
168+
inputUnit = TimestampHelper.getTimestampUnits(timestampUnits, longValue);
169+
} else {
169170
// special case: Debezium micros since epoch
170171
inputUnit = TimeUnit.MICROSECONDS;
171-
} else {
172-
// no idea what's that, let's assume it's again millis since epoch and hope for the best
173-
inputUnit = TimeUnit.MILLISECONDS;
174172
}
175173
return inputUnit.toNanos(longValue);
176174
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.questdb.kafka;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
public final class TimestampHelper {
6+
private TimestampHelper() {
7+
}
8+
9+
static TimeUnit guessTimestampUnits(long timestamp) {
10+
if (timestamp < 10000000000000L) { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros
11+
return TimeUnit.MILLISECONDS;
12+
} else if (timestamp < 10000000000000000L) {
13+
return TimeUnit.MICROSECONDS;
14+
} else {
15+
return TimeUnit.NANOSECONDS;
16+
}
17+
}
18+
19+
static TimeUnit getTimestampUnits(TimeUnit hint, long timestamp) {
20+
if (hint == null) {
21+
return guessTimestampUnits(timestamp);
22+
} else {
23+
return hint;
24+
}
25+
}
26+
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import org.testcontainers.junit.jupiter.Container;
2828
import org.testcontainers.junit.jupiter.Testcontainers;
2929

30-
import java.io.IOException;
3130
import java.math.BigDecimal;
3231
import java.util.Calendar;
3332
import java.util.HashMap;
3433
import java.util.List;
3534
import java.util.Map;
3635
import java.util.Objects;
3736
import java.util.TimeZone;
37+
import java.util.concurrent.TimeUnit;
3838
import java.util.concurrent.atomic.AtomicInteger;
3939

4040
import static java.util.Collections.singletonMap;
@@ -221,6 +221,93 @@ public void testUpfrontTable_withSymbols() {
221221
}
222222

223223
@Test
224+
public void testTimestampUnitResolution_auto() {
225+
connect.kafka().createTopic(topicName, 1);
226+
Map<String, String> props = baseConnectorProps(topicName);
227+
props.put("value.converter.schemas.enable", "false");
228+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth");
229+
connect.configureConnector(CONNECTOR_NAME, props);
230+
assertConnectorTaskRunningEventually();
231+
232+
java.util.Date birth = new Calendar.Builder()
233+
.setTimeZone(TimeZone.getTimeZone("UTC"))
234+
.setDate(2022, 9, 23) // note: month is 0-based
235+
.setTimeOfDay(13, 53, 59, 123)
236+
.build().getTime();
237+
238+
long birthInMillis = birth.getTime();
239+
long birthInMicros = birthInMillis * 1000;
240+
long birthInNanos = birthInMicros * 1000;
241+
242+
243+
connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":" + birthInMillis + "}");
244+
connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthInMicros + "}");
245+
connect.kafka().produce(topicName, "baz", "{\"firstname\":\"Jack\",\"lastname\":\"Doe\",\"birth\":" + birthInNanos + "}");
246+
247+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n"
248+
+ "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n"
249+
+ "\"Jane\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n"
250+
+ "\"Jack\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n",
251+
"select firstname,lastname,timestamp from " + topicName);
252+
}
253+
254+
@Test
255+
public void testTimestampUnitResolution_millis() {
256+
testTimestampUnitResolution0("millis");
257+
}
258+
259+
@Test
260+
public void testTimestampUnitResolution_micros() {
261+
testTimestampUnitResolution0("micros");
262+
}
263+
264+
@Test
265+
public void testTimestampUnitResolution_nanos() {
266+
testTimestampUnitResolution0("nanos");
267+
}
268+
269+
private void testTimestampUnitResolution0(String mode) {
270+
TimeUnit unit;
271+
switch (mode) {
272+
case "nanos":
273+
unit = TimeUnit.NANOSECONDS;
274+
break;
275+
case "micros":
276+
unit = TimeUnit.MICROSECONDS;
277+
break;
278+
case "millis":
279+
unit = TimeUnit.MILLISECONDS;
280+
break;
281+
default:
282+
throw new IllegalArgumentException("Unknown mode: " + mode);
283+
}
284+
connect.kafka().createTopic(topicName, 1);
285+
Map<String, String> props = baseConnectorProps(topicName);
286+
props.put("value.converter.schemas.enable", "false");
287+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth");
288+
props.put(QuestDBSinkConnectorConfig.TIMESTAMP_UNITS_CONFIG, mode);
289+
connect.configureConnector(CONNECTOR_NAME, props);
290+
assertConnectorTaskRunningEventually();
291+
292+
long birthMillis = new Calendar.Builder()
293+
.setTimeZone(TimeZone.getTimeZone("UTC"))
294+
.setDate(2206, 10, 20) // note: month is 0-based
295+
.setTimeOfDay(17, 46, 39, 999)
296+
.build().getTime().getTime();
297+
298+
long birthTarget = unit.convert(birthMillis, MILLISECONDS);
299+
300+
connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":0}");
301+
connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthTarget + "}");
302+
303+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n"
304+
+ "\"John\",\"Doe\",\"1970-01-01T00:00:00.000000Z\"\r\n"
305+
+ "\"Jane\",\"Doe\",\"2206-11-20T17:46:39.999000Z\"\r\n",
306+
"select firstname,lastname,timestamp from " + topicName);
307+
}
308+
309+
310+
@Test
224311
public void testUpfrontTable() {
225312
connect.kafka().createTopic(topicName, 1);
226313
Map<String, String> props = baseConnectorProps(topicName);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.questdb.kafka;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import java.util.Calendar;
6+
import java.util.Date;
7+
import java.util.TimeZone;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import static org.junit.jupiter.api.Assertions.*;
11+
12+
public class TimestampHelperTest {
13+
14+
@Test
15+
public void testBoundaries_auto() {
16+
// 4/26/1970, 5:46:40 PM
17+
Date lowerBound = new Calendar.Builder()
18+
.setTimeZone(TimeZone.getTimeZone("UTC"))
19+
.setDate(1970, 3, 26) // note: month is 0-based
20+
.setTimeOfDay(17, 46, 40, 1)
21+
.build().getTime();
22+
23+
// 11/20/2286, 5:46:40 PM
24+
Date upperBound = new Calendar.Builder()
25+
.setTimeZone(TimeZone.getTimeZone("UTC"))
26+
.setDate(2286, 10, 20) // note: month is 0-based
27+
.setTimeOfDay(17, 46, 39, 999)
28+
.build().getTime();
29+
30+
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime()));
31+
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, upperBound.getTime()));
32+
33+
assertEquals(TimeUnit.MICROSECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime() * 1000));
34+
assertEquals(TimeUnit.MICROSECONDS, TimestampHelper.getTimestampUnits(null, upperBound.getTime() * 1000));
35+
36+
assertEquals(TimeUnit.NANOSECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime() * 1000000));
37+
assertEquals(TimeUnit.NANOSECONDS, TimestampHelper.getTimestampUnits(null, Long.MAX_VALUE)); //upper bound in nanos does not fit in long
38+
}
39+
40+
@Test
41+
public void testBoundaries_explicit() {
42+
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MILLISECONDS, 0));
43+
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MILLISECONDS, Long.MAX_VALUE));
44+
45+
assertEquals(TimeUnit.MICROSECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MICROSECONDS, 0));
46+
assertEquals(TimeUnit.MICROSECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MICROSECONDS, Long.MAX_VALUE));
47+
48+
assertEquals(TimeUnit.NANOSECONDS, TimestampHelper.getTimestampUnits(TimeUnit.NANOSECONDS, 0));
49+
assertEquals(TimeUnit.NANOSECONDS, TimestampHelper.getTimestampUnits(TimeUnit.NANOSECONDS, Long.MAX_VALUE));
50+
}
51+
52+
}

kafka-questdb-connector-samples/stocks/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
<plugin>
8080
<groupId>org.springframework.boot</groupId>
8181
<artifactId>spring-boot-maven-plugin</artifactId>
82+
<version>${spring.boot.version}</version>
8283
<configuration>
8384
<mainClass>io.questdb.kafka.samples.StocksApp</mainClass>
8485
</configuration>

readme.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The connector supports following Options:
4444
| value.prefix | STRING | from_value | N/A | Prefix for value fields |
4545
| skip.unsupported.types | BOOLEAN | false | false | Skip unsupported types |
4646
| timestamp.field.name | STRING | pickup_time | N/A | Designated timestamp field name |
47+
| timestamp.units | STRING | micros | auto | Designated timestamp field units |
4748
| include.key | BOOLEAN | false | true | Include message key in target table |
4849
| symbols | STRING | instrument,stock | N/A | Comma separated list of columns that should be symbol type |
4950
| username | STRING | user1 | admin | User name for QuestDB. Used only when token is non-empty |
@@ -76,7 +77,7 @@ The connector will create a table with the following columns:
7677
| John | Doe | 30 | Main Street | New York |
7778

7879
## Designated Timestamps
79-
The connector supports designated timestamps. If the message contains a field with a timestamp, the connector can use it as a timestamp for the row. The field name must be configured using `timestamp.field.name` option. The field must either a simple number or a timestamp. When it's a simple number, the connector will interpret it as a Unix timestamp in milliseconds.
80+
The connector supports designated timestamps. If a message contains a field with a timestamp, the connector can use it as a timestamp for the row. The field name must be configured using `timestamp.field.name` option. The field must either a plain integer or being labelled as a timestamp in a message schema. When it's a plain integer, the connector will autodetect its units. This works for timestamps after 4/26/1970, 5:46:40 PM. The units can be also configured explicitly using `timestamp.units` option. Supported configuration values are `nanos`, `micros`, `millis` and `auto`.
8081

8182
## QuestDB Symbol Type
8283
QuestDB supports a special type called [Symbol](https://questdb.io/docs/concept/symbol/). This connector never creates a column with a type `SYMBOL`. Instead, it creates a column with a type `STRING`. If you want to use `SYMBOL` type, you can pre-create a table in QuestDB and use it as a target table.

0 commit comments

Comments
 (0)