Skip to content

Commit 858c476

Browse files
committed
wip
1 parent eb449c4 commit 858c476

File tree

1 file changed

+21
-3
lines changed
  • kafka-questdb-connector-samples/stocks

1 file changed

+21
-3
lines changed

kafka-questdb-connector-samples/stocks/readme.md

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
1717
3. Run `docker-compose build` to build docker images with the sample project. This will take a few minutes.
1818
4. Run `docker-compose up` to start Postgres, Java stock price updater app, Apache Kafka, Kafka Connect with Debezium and QuestDB connectors, QuestDB and Grafana. This will take a few minutes.
1919
5. The previous command will generate a lot of log messages. Eventually logging should cease. This means all containers are running.
20-
6. Execute following command to start Debezium connector:
20+
6. In a separate shell, execute following command to start Debezium connector:
2121
```shell
2222
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
2323
```
@@ -33,7 +33,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
3333
```
3434
It should return some rows. If it does not return any rows, wait a few seconds and try again.
3535
9. Go to [Grafana Dashboard](http://localhost:3000/d/stocks/stocks?orgId=1&refresh=5s&viewPanel=2). It should show some data. If it does not show any data, wait a few seconds, refresh try again.
36-
10. Play with the Grafana dashboard a bit. You can change the aggregation interval, zoom-in and zoom-out, etc.
36+
10. Play with the Grafana dashboard a bit. You can change the aggregation interval, change stock, zoom-in and zoom-out, etc.
3737
11. Go to [QuestDB Web Console](http://localhost:19000/) again and execute following query:
3838
```sql
3939
SELECT
@@ -46,7 +46,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
4646
where symbol = 'IBM'
4747
SAMPLE by 1m align to calendar;
4848
```
49-
It returns the average, minimum and maximum stock price for IBM for each minute. You can change the `1m` to `1s` to get data aggregated by second. The `SAMPLE by` shows a bit of QuestDB syntax sugar to make time-related queries more readable.
49+
It returns the average, minimum and maximum stock price for IBM in each minute. You can change the `1m` to `1s` to get data aggregated by second. The `SAMPLE by` shows a bit of QuestDB syntax sugar to make time-related queries more readable.
5050
12. Don't forget to stop the containers when you're done. The project generates a lot of data and you could run out of disk space.
5151
5252
## Project Internals
@@ -116,6 +116,24 @@ You can see the `payload` field contains the actual change. Let's zoom it a bit
116116
```
117117
This is the actual change in a table. It's a JSON object which contains the new values for the columns in the Postgres table. Notice has the structure maps to the Postgres table schema described above.
118118
119+
We cannot feed a full change object to Kafka Connect QuestDB Sink, because the sink would create a column for each field in the change object, including all metadata, like the source part of the JSON:
120+
```json
121+
"source": {
122+
"version": "1.9.6.Final",
123+
"connector": "postgresql",
124+
"name": "dbserver1",
125+
"ts_ms": 1666172978272,
126+
"snapshot": "false",
127+
"db": "postgres",
128+
"sequence": "[\"87397208\",\"87397208\"]",
129+
"schema": "public",
130+
"table": "stock",
131+
"txId": 402087,
132+
"lsn": 87397208,
133+
"xmin": null
134+
},
135+
```
136+
We do not want to create columns in QuestDB for all this metadata. We only want to create columns for the actual data. Debezium comes to the rescue! It ships with a Kafka Connect transform which can extract the actual data from the change object and feed it to the Kafka Connect sink. The transform is called `ExtractNewRecordState`.
119137
120138
Postgres -> Kafka:
121139
```shell

0 commit comments

Comments
 (0)