Skip to content

Commit 1bf7483

Browse files
committed
improve docs
1 parent ef0ee62 commit 1bf7483

File tree

1 file changed

+35
-7
lines changed

1 file changed

+35
-7
lines changed

readme.md

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@ This guide assumes you are already familiar with Apache Kafka and Kafka Connect.
1414
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
1515
"host": "localhost:9009",
1616
"topics": "Orders",
17-
"table": "orders_table"
17+
"table": "orders_table",
18+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
19+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
20+
"transforms": "unwrap",
21+
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
22+
"include.key": "false",
23+
"timestamp.field.name": "created_at"
1824
}
1925
}
2026
```
@@ -72,15 +78,37 @@ When a target table does not exist in QuestDB then it will be automatically crea
7278
In production, it's recommended to [create tables manually via SQL](https://questdb.io/docs/reference/sql/create-table/). This gives you more control over the table schema and allows using the symbol type, create indexes, etc.
7379

7480
## FAQ
75-
Q: Does this connector work with Schema Registry?
81+
<b>Q</b>: Does this connector work with Schema Registry?
7682

77-
A: The Connector does not care about serialization strategy used. It relies on Kafka Connect converters to deserialize data. Converters can be configured using `key.converter` and `value.converter` options, see the configuration section.
83+
<b>A</b>: The Connector does not care about serialization strategy used. It relies on Kafka Connect converters to deserialize data. Converters can be configured using `key.converter` and `value.converter` options, see the configuration section.
7884

85+
<b>Q</b>: I'm getting this error: `org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.`
7986

80-
Q: I'm getting this error: `org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.`
87+
<b>A</b>: This error means that the connector is trying to deserialize data using a converter that expects a schema. The connector does not use schemas, so you need to configure the converter to not expect a schema. For example, if you are using JSON converter, you need to set `value.converter.schemas.enable=false` or `key.converter.schemas.enable=false` in the connector configuration.
8188

82-
A: This error means that the connector is trying to deserialize data using a converter that expects a schema. The connector does not use schemas, so you need to configure the converter to not expect a schema. For example, if you are using JSON converter, you need to set `value.converter.schemas.enable=false` or `key.converter.schemas.enable=false` in the connector configuration.
89+
<b>Q</b>: Does this connector work with Debezium?
8390

84-
Q: Does this connector work with Debezium?
91+
<b>A</b>: Yes, it's been tested with Debezium as a source. Bear in mind that QuestDB is meant to be used as append-only database hence updates should be translated as new inserts. The connector supports Debezium's `ExtractNewRecordState` transformation to extract the new state of the record. The transform by default drops DELETE events so no need to handle it explicitly.
8592

86-
A: Yes, it's been tested with Debezium as a source. Bear in mind that QuestDB is meant to be used as append-only database hence updates should be translated as new inserts.
93+
<b>Q</b>: How I can select which fields to include in the target table?
94+
95+
<b>A</b>: Use the ReplaceField transformation to remove unwanted fields. For example, if you want to remove the `address` field from the example above, you can use the following configuration:
96+
```json
97+
{
98+
"name": "questdb-sink",
99+
"config": {
100+
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
101+
"host": "localhost:9009",
102+
"topics": "Orders",
103+
"table": "orders_table",
104+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
105+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
106+
"transforms": "unwrap,removeAddress",
107+
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
108+
"transforms.removeAddress.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
109+
"transforms.removeAddress.blacklist": "address",
110+
"include.key": "false"
111+
}
112+
}
113+
```
114+
See [ReplaceField documentation](https://docs.confluent.io/platform/current/connect/transforms/replacefield.html#replacefield) for more details.

0 commit comments

Comments
 (0)