Skip to content

Commit a2f0b35

Browse files
committed
sample readme updated
1 parent e7c15fe commit a2f0b35

File tree

1 file changed

+122
-24
lines changed
  • kafka-questdb-connector-samples/stocks

1 file changed

+122
-24
lines changed

kafka-questdb-connector-samples/stocks/readme.md

Lines changed: 122 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,95 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
5151
13. Don't forget to stop the containers when you're done. The project generates a lot of data and you could run out of disk space.
5252
5353
## Project Internals
54-
The Postgres table has the following schema:
55-
```sql
56-
create table if not exists stock (
57-
id serial primary key,
58-
symbol varchar(10) unique,
59-
price float8,
60-
last_update timestamp
61-
);
54+
If you liked what you see and want to learn more about the internals of the project, read on. It's time do demistify the black box. We will discuss these components:
55+
1. Postgres and its schema
56+
2. Java stock price updater
57+
3. Debezium Postgres connector
58+
4. Kafka QuestDB connector
59+
5. QuestDB
60+
6. Grafana
61+
62+
### Postgres
63+
The docker-compose start Postgres image. It's using a container image provided by the Debezium project as they maintain a Postgres which is preconfigured for Debezium.
64+
65+
## Java stock price updater
66+
It's a Spring Boot application which during startup creates a table in Postgres and populates it with initial data.
67+
You can see the SQL executed in the [schema.sql](src/main/resources/schema.sql) file. The table has always one row per each stock symbol.
68+
69+
Once the application is started, it starts updating stock prices in regular intervals. The `price` and `last_update` columns are updated every time a new price is received for the stock symbol. It mimics a real-world scenario where you would have a Postgres table with the latest prices for each stock symbol. Such table would be typically used by a transactional system to get the latest prices for each stock symbol. It our case the transactional system is simulated by a [simple Java application](src/main/java/io/questdb/kafka/samples/StockService.java) which is randomly updating prices for each stock symbol in the Postgres table. The application generates 1000s of updates each second.
70+
71+
The application is build and packaged as container image when executing `docker-compose build`. Inside the [docker-compose file](docker-compose.yml) you can see the container called `producer`. That's our Java application.
72+
```Dockerfile
73+
producer:
74+
image: kafka-questdb-connector-samples-stocks-generator
75+
build:
76+
dockerfile: ./Dockerfile
77+
context: .
78+
depends_on:
79+
postgres:
80+
condition: service_healthy
81+
links:
82+
- postgres:postgres
6283
```
63-
The table has always one row per each stock symbol. The `price` and `last_update` columns are updated every time a new price is received for the stock symbol. It mimics a real-world scenario where you would have a Postgres table with the latest prices for each stock symbol. Such table would be typically used by a transactional system to get the latest prices for each stock symbol. It our case the transactional system is simulated by a [simple Java application](src/main/java/io/questdb/kafka/samples/StockService.java) which is randomly updating prices for each stock symbol in the Postgres table. The application generates 1000s of updates each second.
6484
65-
Then we have a pipeline which reads the changes from the Postgres table and feeds them to a QuestDB table. The pipeline is composed of the following components:
66-
- [Debezium Postgres connector](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html) which reads changes from the Postgres table and feeds them to a Kafka topic.
67-
- Kafka QuestDB connector which reads changes from the Kafka topic and feeds them to a [QuestDB](https://questdb.io) table.
68-
- QuestDB SQL console which is used to query the QuestDB table.
69-
- [Grafana](https://grafana.com/) which is used to visualize the data in the QuestDB table.
85+
## Debezium Postgres connector
86+
Debezium is an open source project which provides connectors for various databases. It is used to capture changes from a database and feed them to a Kafka topic. In other words: Whenever there is a change in a database table, Debezium will read the change and feed it to a Kafka topic. This way it translates operations such as INSERT or UPDATE into events which can be consumed by other systems. Debezium supports a wide range of databases. In this sample we use the Postgres connector.
7087
71-
Debezium is open source project which provides connectors for various databases. It is used to capture changes from a database and feed them to a Kafka topic. In other words: Whenever there is a change in a database table, Debezium will read the change and feed it to a Kafka topic. This way in translates operations such as INSERT or UPDATE into events which can be consumed by other systems. Debezium supports a wide range of databases. In this sample we use the Postgres connector. Debezium is technically implemented as a Kafka Connect source.
88+
The Debezium Postgres connector is implemented as a Kafka Connect source connector. Inside the [docker-compose file](docker-compose.yml) it's called `connect` and its container image is also built during `docker-compose build`. The [Dockerfile](../../Dockerfile-Samples) uses Debezium image. The Debezium image contains Kafka Connect runtime and Debezium connectors. Our Dockerfile amends it with Kafka Connect QuestDB Sink.
7289
73-
For every change in the Postgres table the Debezium emits a JSON message to a Kafka topic. Messages look like this:
90+
What's important: When this container start it just connects to Kafka broker, but it does not start any connectors. We need to start the connectors using `curl` command. This is how we started the Debezium connector:
91+
```shell
92+
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
93+
```
94+
It uses Kafka Connect REST interface to start a new connector with a give configuration. Let's have a closer look at the configuration. This is how it looks like when formatted for readability:
95+
```json
96+
{
97+
"name": "debezium_source",
98+
"config": {
99+
"tasks.max": 1,
100+
"database.hostname": "postgres",
101+
"database.port": 5432,
102+
"database.user": "postgres",
103+
"database.password": "postgres",
104+
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
105+
"database.dbname": "postgres",
106+
"database.server.name": "dbserver1"
107+
}
108+
}
109+
```
110+
Most of the fields are self-explanatory. The only non-obvious one is `database.server.name`. It's a unique name of the database server. It's used by Kafka Connect to store offsets. It's important that it's unique for each database server. If you have multiple Postgres databases, you need to use different `database.server.name` for each of them. It's used by Debezium to generate Kafka topic names. The topic name is generated as `database.server.name`.`schema`.`table`. In our case it's `dbserver1.public.stock`.
111+
112+
## Kafka QuestDB connector
113+
The Kafka QuestDB connector re-uses the same Kafka Connect runtime as the Debezium connector. It's also started using `curl` command. This is how we started the QuestDB connector:
114+
```shell
115+
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
116+
```
117+
This is the connector JSON configuration nicely formatted:
118+
```json
119+
{
120+
"name": "questdb-connect",
121+
"config": {
122+
"topics": "dbserver1.public.stock",
123+
"table": "stock",
124+
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
125+
"tasks.max": "1",
126+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
127+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
128+
"host": "questdb",
129+
"transforms": "unwrap",
130+
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
131+
"include.key": "false",
132+
"symbols": "symbol",
133+
"timestamp.field.name": "last_update"
134+
}
135+
}
136+
```
137+
Again, most of the fields are obvious. Let's focus on the non-obvious ones.
138+
1. `"symbols": "symbol"` this instruct to connector to use the [QuestDB symbol type](https://questdb.io/docs/concept/symbol/) for a column named "symbols". This column has low cardinality thus it's a good candidate for symbol type.
139+
2. `"timestamp.field.name": "last_update"` this instructs the connector to use the `last_update` column as the [designated timestamp](https://questdb.io/docs/concept/designated-timestamp/) column.
140+
3. `"transforms":"unwrap"` and `"transforms.unwrap.type"` this instructs the connector to use Debezium's ExtractNewRecordState.
141+
142+
Let's focus on the ExtractNewRecordState transform a bit more. Why is it needed at all? For every change in the Postgres table the Debezium emits a JSON message to a Kafka topic. Messages look like this:
74143
```json
75144
{
76145
"schema": {
@@ -117,7 +186,7 @@ You can see the `payload` field contains the actual change. Let's zoom it a bit
117186
```
118187
This is the actual change in a table. It's a JSON object which contains the new values for the columns in the Postgres table. Notice has the structure maps to the Postgres table schema described above.
119188
120-
We cannot feed a full change object to Kafka Connect QuestDB Sink, because the sink would create a column for each field in the change object, including all metadata, like the source part of the JSON:
189+
We cannot feed a full change object to Kafka Connect QuestDB Sink, because the sink would create a column for each field in the change object, including all metadata, for example the source part of the JSON:
121190
```json
122191
"source": {
123192
"version": "1.9.6.Final",
@@ -134,16 +203,45 @@ We cannot feed a full change object to Kafka Connect QuestDB Sink, because the s
134203
"xmin": null
135204
},
136205
```
137-
We do not want to create columns in QuestDB for all this metadata. We only want to create columns for the actual data. Debezium comes to the rescue! It ships with a Kafka Connect transform which can extract the actual data from the change object and feed it to the Kafka Connect sink. The transform is called `ExtractNewRecordState`.
138206
139-
Postgres -> Kafka:
140-
```shell
141-
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
207+
We do not want to create columns in QuestDB for all this metadata. We only want to create columns for the actual data. This is where the `ExtractNewRecordState` transform comes to the rescue! It extracts only the actual new data from the overall change object and feeds only this small part to the QuestDB sink. The end-result is that each INSERT and UPDATE in Postgres will insert a new row in QuestDB.
208+
209+
## QuestDB
210+
QuestDB is a fast, open-source time-series database. It uses SQL for querying and it adds a bit of syntax sugar on top of SQL to make it easier to work with time-series data. It implements the Postgres wire protocol so many tools can be used to connect to it.
211+
212+
## Grafana
213+
Grafana is a popular open-source tool for visualizing time-series data. It can be used to visualize data from QuestDB. There is no native QuestDB datasource for Grafana, but there is a Postgres datasource. We can use this datasource to connect to QuestDB. Grafana is provisioned with a dashboard that visualizes the data from QuestDB in a candlestick chart. The char is configured to execute this query:
214+
```sql
215+
SELECT
216+
$__time(timestamp),
217+
min(price) as low,
218+
max(price) as high,
219+
first(price) as open,
220+
last(price) as close
221+
FROM
222+
stock
223+
WHERE
224+
$__timeFilter(timestamp)
225+
and symbol = '$Symbol'
226+
SAMPLE BY $Interval ALIGN TO CALENDAR;
142227
```
228+
`$__time` is a Grafana macro that converts the timestamp column to the format expected by Grafana. `$__timeFilter` is another Grafana macro that filters the data based on the time range selected in the Grafana dashboard. `$Symbol` is a variable that can be set in the Grafana dashboard. `$Interval` is another variable that can be set in the Grafana dashboard. It controls the granularity of the data.
143229
144-
Kafka -> QuestDB
145-
```shell
146-
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
230+
Grafana will resolve the macros and execute queries similar to this:
231+
```json
232+
SELECT
233+
timestamp AS "time",
234+
min(price) as low,
235+
max(price) as high,
236+
first(price) as open,
237+
last(price) as close
238+
FROM
239+
stock
240+
WHERE
241+
timestamp BETWEEN '2022-10-19T12:23:44.951Z' AND '2022-10-19T12:28:44.951Z'
242+
and symbol = 'SNAP'
243+
SAMPLE BY 5s ALIGN TO CALENDAR;
147244
```
245+
And this is then used by the candlestick chart to visualize the data.
148246
149247
[Grafana Dashboard](http://localhost:3000/d/stocks/stocks?orgId=1&refresh=5s&viewPanel=2)

0 commit comments

Comments
 (0)