Skip to content

Commit e9aa797

Browse files
committed
option to exclude key from target table table
1 parent a0d284c commit e9aa797

File tree

3 files changed

+51
-9
lines changed

3 files changed

+51
-9
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
2323
public static final String SKIP_UNSUPPORTED_TYPES_CONFIG = "skip.unsupported.types";
2424
private static final String SKIP_UNSUPPORTED_TYPES_DOC = "Skip unsupported types";
2525

26-
public static final String DESIGNATED_TIMESTAMP_COLUMN_NAME = "timestamp.field.name";
26+
public static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.field.name";
2727
private static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC = "Designated timestamp field name";
2828

29+
public static final String INCLUDE_KEY_CONFIG = "include.key";
30+
private static final String INCLUDE_KEY_DOC = "Include key in the table";
31+
2932
public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
3033
super(config, parsedConfig);
3134
}
@@ -41,7 +44,8 @@ public static ConfigDef conf() {
4144
.define(KEY_PREFIX_CONFIG, Type.STRING, "key", Importance.MEDIUM, KEY_PREFIX_DOC)
4245
.define(VALUE_PREFIX_CONFIG, Type.STRING, "", Importance.MEDIUM, VALUE_PREFIX_DOC)
4346
.define(SKIP_UNSUPPORTED_TYPES_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, SKIP_UNSUPPORTED_TYPES_DOC)
44-
.define(DESIGNATED_TIMESTAMP_COLUMN_NAME, Type.STRING, null, Importance.MEDIUM, DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC);
47+
.define(DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, Type.STRING, null, Importance.MEDIUM, DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC)
48+
.define(INCLUDE_KEY_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, INCLUDE_KEY_DOC);
4549
}
4650

4751
public String getHost() {
@@ -65,6 +69,10 @@ public boolean isSkipUnsupportedTypes() {
6569
}
6670

6771
public String getDesignatedTimestampColumnName() {
68-
return getString(DESIGNATED_TIMESTAMP_COLUMN_NAME);
72+
return getString(DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG);
73+
}
74+
75+
public boolean isIncludeKey() {
76+
return getBoolean(INCLUDE_KEY_CONFIG);
6977
}
7078
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ private void handleSingleRecord(SinkRecord record) {
5959
sender.table(tableName);
6060

6161
//todo: detect duplicated columns
62-
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
62+
if (config.isIncludeKey()) {
63+
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
64+
}
6365
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
6466

6567
if (timestampColumnValue == null) {

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import static java.util.Collections.singletonMap;
4141
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4242
import static java.util.concurrent.TimeUnit.SECONDS;
43-
import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
4443
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
4544
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
4645

@@ -63,7 +62,7 @@ public final class QuestDBSinkConnectorEmbeddedTest {
6362
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
6463

6564
@BeforeEach
66-
public void setUp() throws IOException {
65+
public void setUp() {
6766
topicName = newTopicName();
6867
JsonConverter jsonConverter = new JsonConverter();
6968
jsonConverter.configure(singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()));
@@ -120,7 +119,7 @@ public void testDesignatedTimestamp_noSchema_unixEpochMillis() {
120119
connect.kafka().createTopic(topicName, 1);
121120
Map<String, String> props = baseConnectorProps(topicName);
122121
props.put("value.converter.schemas.enable", "false");
123-
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME, "birth");
122+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth");
124123
connect.configureConnector(CONNECTOR_NAME, props);
125124
assertConnectorTaskRunningEventually();
126125

@@ -141,7 +140,7 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp
141140
props.put("transforms.convert_birth.target.type", "Timestamp");
142141
props.put("transforms.convert_birth.field", "birth");
143142
props.put("transforms.convert_birth.format", "yyyy-MM-dd'T'HH:mm:ss.SSSX");
144-
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME, "birth");
143+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth");
145144
connect.configureConnector(CONNECTOR_NAME, props);
146145
assertConnectorTaskRunningEventually();
147146

@@ -156,7 +155,7 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp
156155
public void testDesignatedTimestamp_withSchema() {
157156
connect.kafka().createTopic(topicName, 1);
158157
Map<String, String> props = baseConnectorProps(topicName);
159-
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME, "birth");
158+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth");
160159
connect.configureConnector(CONNECTOR_NAME, props);
161160
assertConnectorTaskRunningEventually();
162161
Schema schema = SchemaBuilder.struct().name("com.example.Person")
@@ -184,6 +183,39 @@ public void testDesignatedTimestamp_withSchema() {
184183
"select * from " + topicName);
185184
}
186185

186+
@Test
187+
public void testDoNotIncludeKey() {
188+
connect.kafka().createTopic(topicName, 1);
189+
Map<String, String> props = baseConnectorProps(topicName);
190+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth");
191+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
192+
connect.configureConnector(CONNECTOR_NAME, props);
193+
assertConnectorTaskRunningEventually();
194+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
195+
.field("firstname", Schema.STRING_SCHEMA)
196+
.field("lastname", Schema.STRING_SCHEMA)
197+
.field("birth", Timestamp.SCHEMA)
198+
.build();
199+
200+
java.util.Date birth = new Calendar.Builder()
201+
.setTimeZone(TimeZone.getTimeZone("UTC"))
202+
.setDate(2022, 9, 23) // note: month is 0-based
203+
.setTimeOfDay(13, 53, 59, 123)
204+
.build().getTime();
205+
206+
Struct struct = new Struct(schema)
207+
.put("firstname", "John")
208+
.put("lastname", "Doe")
209+
.put("birth", birth);
210+
211+
212+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
213+
214+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n"
215+
+ "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n",
216+
"select * from " + topicName);
217+
}
218+
187219
@Test
188220
public void testJsonNoSchema() {
189221
connect.kafka().createTopic(topicName, 1);

0 commit comments

Comments
 (0)