|
| 1 | +# Sample Project: Kafka -> QuestDB |
| 2 | +## What does the sample project do? |
| 3 | +This code sample show the simplest usage of the Kafka QuestDB connector. It uses a simple node.js application to generate random data and send it to a Kafka topic. The connector is configured to read from the topic and write the data to a QuestDB table. |
| 4 | + |
| 5 | +## Prerequisites |
| 6 | +- Git |
| 7 | +- Working Docker environment, including docker-compose |
| 8 | +- Internet access to download dependencies |
| 9 | + |
| 10 | +The project was tested on MacOS with M1, but it should work on other platforms too. Please open a new issue if it's not working for you. |
| 11 | + |
| 12 | +## Usage: |
| 13 | +1. Clone this repository via `git clone https://github.com/questdb/kafka-questdb-connector.git` |
| 14 | +2. `cd kafka-questdb-connector/kafka-questdb-connector-samples/faker/` to enter the directory with this sample. |
| 15 | +3. Run `docker-compose build` to build a docker image with the sample project. |
| 16 | +4. Run `docker-compose up` to start the node.js producer, Apache Kafka and QuestDB containers. |
| 17 | +5. The previous command will generate a lot of log messages. Eventually logging should cease. This means both Apache Kafka and QuestDB are running. The last log message should contain the following text: `Session key updated` |
| 18 | +6. Execute the following command in shell: |
| 19 | + ```shell |
| 20 | + $ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors |
| 21 | + ``` |
| 22 | +7. The command above will create a new Kafka connector that will read data from the `People` topic and write it to a QuestDB table called `People`. The connector will also convert the `birthday` field to a timestamp. |
| 23 | +8. Go to the [QuestDB console](http://localhost:19000) and run `select * from 'People';` and you should see some rows. |
| 24 | +9. Congratulations! You have successfully created a Kafka connector that reads data from a Kafka topic and writes it to a QuestDB table! |
| 25 | + |
| 26 | +## How does it work? |
| 27 | +The sample project consists of 3 components: |
| 28 | +1. [Node.js](index.js) application that generates random JSON data and sends it to a Kafka topic. Each generated JSON looks similar to this: |
| 29 | + ```json |
| 30 | + {"firstname":"John","lastname":"Doe","birthday":"1970-01-12T10:25:12.052Z"} |
| 31 | + ``` |
| 32 | +2. [Docker-compose](docker-compose.yml) file that starts the node.js application, Apache Kafka, Kafka Connect and QuestDB containers. |
| 33 | +3. Kafka Connect configuration that defines the connector that reads data from the Kafka topic and writes it to a QuestDB table. The configuration is submitted into a running Kafka Connect cluster via the REST API. |
| 34 | + |
| 35 | +The Kafka Connect configuration looks complex, but it's quite simple. Let's have a closer look. This is how the `curl` command looks like: |
1 | 36 | ```shell |
2 | | -curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors |
3 | | -``` |
| 37 | +$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors |
| 38 | +``` |
| 39 | + |
| 40 | +It uses `curl` to submit a following JSON to Kafka Connect: |
| 41 | +```json |
| 42 | +{ |
| 43 | + "name": "questdb-connect", |
| 44 | + "config": { |
| 45 | + "topics": "People", |
| 46 | + "connector.class": "io.questdb.kafka.QuestDBSinkConnector", |
| 47 | + "tasks.max": "1", |
| 48 | + "key.converter": "org.apache.kafka.connect.storage.StringConverter", |
| 49 | + "value.converter": "org.apache.kafka.connect.json.JsonConverter", |
| 50 | + "value.converter.schemas.enable": "false", |
| 51 | + "host": "questdb", |
| 52 | + "timestamp.field.name": "birthday", |
| 53 | + "transforms": "convert_birthday", |
| 54 | + "transforms.convert_birthday.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", |
| 55 | + "transforms.convert_birthday.target.type": "Timestamp", |
| 56 | + "transforms.convert_birthday.field": "birthday", |
| 57 | + "transforms.convert_birthday.format": "yyyy-MM-dd'T'HH:mm:ss.SSSX" |
| 58 | + } |
| 59 | +} |
| 60 | +``` |
| 61 | +Most of the fields are self-explanatory. The `transforms` field is a bit more complex. It defines a [Kafka Connect transformation](https://docs.confluent.io/platform/current/connect/transforms/index.html) that converts the `birthday` string field to a timestamp. The `transforms.convert_birthday.format` field defines the format of the date. The format is a [Java SimpleDateFormat](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html) pattern. The `transforms.convert_birthday.field` field defines the name of the field that should be converted. The `transforms.convert_birthday.target.type` field defines the type of the field after the conversion. In this case, it's a timestamp. |
| 62 | +
|
| 63 | +The other potentially non-obvious configuration elements are: |
| 64 | +1. `"value.converter.schemas.enable": "false"` It disables the schema support in the Kafka Connect JSON converter. The sample project doesn't use schemas. |
| 65 | +2. `"host": "questdb"` It defines the hostname of the QuestDB instance. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file. |
| 66 | +3. `"timestamp.field.name": "birthday"` It defines the name of the field that should be used as a timestamp. It uses the field that was converted by the Kafka Connect transformation described above. |
| 67 | +4. The ugly value in `"transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"`. This part looks funny: `'"'"'T'"'"'`. In fact, it's a way to submit an apostrophe via shell which uses apostrophes to define strings. The apostrophe is required to escape the `T` character in the date format. The date format is `yyyy-MM-dd'T'HH:mm:ss.SSSX`. If you know a better way to submit the same JSON then please [open a new issue](https://github.com/questdb/kafka-questdb-connector/issues/new). |
0 commit comments