Skip to content

Commit eb449c4

Browse files
committed
readme improved
1 parent 0b9884c commit eb449c4

File tree

2 files changed

+127
-10
lines changed

2 files changed

+127
-10
lines changed

kafka-questdb-connector-samples/stocks/readme.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,120 @@
1+
# Sample Project: Feeding changes from Postgres to QuestDB
2+
## What does this sample do?
3+
This sample project demonstrates how to feed changes from a Postgres table to QuestDB. It uses the [Debezium Postgres connector](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html) to capture changes from a [Postgres database](https://www.postgresql.org/) and feed them to a [Kafka](https://kafka.apache.org/) topic. The [Kafka QuestDB connector](https://github.com/questdb/kafka-questdb-connector) then reads from the Kafka topic and writes the changes to a [QuestDB](questdb.io/) table. QuestDB is used for analytical queries on data and to feed the data to a Grafana dashboard for visualization.
4+
5+
## Prerequisites
6+
- Git
7+
- Working Docker environment, including docker-compose
8+
- Internet access to download dependencies
9+
10+
The project was tested on MacOS with M1, but it should work on other platforms too. Please open a new issue if it's not working for you.
11+
12+
Bear in mind the sample starts multiple containers. It's running fine on my machines with 16GB RAM, but chances are it will struggle on machines with less RAM.
13+
14+
## Running the sample
15+
1. Clone this repository via `git clone https://github.com/questdb/kafka-questdb-connector.git`
16+
2. `cd kafka-questdb-connector/kafka-questdb-connector-samples/stocks/` to enter the directory with this sample.
17+
3. Run `docker-compose build` to build docker images with the sample project. This will take a few minutes.
18+
4. Run `docker-compose up` to start Postgres, Java stock price updater app, Apache Kafka, Kafka Connect with Debezium and QuestDB connectors, QuestDB and Grafana. This will take a few minutes.
19+
5. The previous command will generate a lot of log messages. Eventually logging should cease. This means all containers are running.
20+
6. Execute following command to start Debezium connector:
21+
```shell
22+
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
23+
```
24+
It starts the Debezium connector that will capture changes from Postgres and feed them to Kafka.
25+
7. Execute following command to start QuestDB Kafka Connect sink:
26+
```shell
27+
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
28+
```
29+
It starts the QuestDB Kafka Connect sink that will read changes from Kafka and write them to QuestDB.
30+
8. Go to [QuestDB Web Console](http://localhost:19000/) and execute following query:
31+
```sql
32+
select * from stock
33+
```
34+
It should return some rows. If it does not return any rows, wait a few seconds and try again.
35+
9. Go to [Grafana Dashboard](http://localhost:3000/d/stocks/stocks?orgId=1&refresh=5s&viewPanel=2). It should show some data. If it does not show any data, wait a few seconds, refresh try again.
36+
10. Play with the Grafana dashboard a bit. You can change the aggregation interval, zoom-in and zoom-out, etc.
37+
11. Go to [QuestDB Web Console](http://localhost:19000/) again and execute following query:
38+
```sql
39+
SELECT
40+
timestamp,
41+
symbol,
42+
avg(price),
43+
min(price),
44+
max(price)
45+
FROM stock
46+
where symbol = 'IBM'
47+
SAMPLE by 1m align to calendar;
48+
```
49+
It returns the average, minimum and maximum stock price for IBM for each minute. You can change the `1m` to `1s` to get data aggregated by second. The `SAMPLE by` shows a bit of QuestDB syntax sugar to make time-related queries more readable.
50+
12. Don't forget to stop the containers when you're done. The project generates a lot of data and you could run out of disk space.
51+
52+
## Project Internals
53+
The Postgres table has the following schema:
54+
```sql
55+
create table if not exists stock (
56+
id serial primary key,
57+
symbol varchar(10) unique,
58+
price float8,
59+
last_update timestamp
60+
);
61+
```
62+
The table has always one row per each stock symbol. The `price` and `last_update` columns are updated every time a new price is received for the stock symbol. It mimics a real-world scenario where you would have a Postgres table with the latest prices for each stock symbol. Such table would be typically used by a transactional system to get the latest prices for each stock symbol. It our case the transactional system is simulated by a [simple Java application](src/main/java/io/questdb/kafka/samples/StockService.java) which is randomly updating prices for each stock symbol in the Postgres table. The application generates 1000s of updates each second.
63+
64+
Then we have a pipeline which reads the changes from the Postgres table and feeds them to a QuestDB table. The pipeline is composed of the following components:
65+
- [Debezium Postgres connector](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html) which reads changes from the Postgres table and feeds them to a Kafka topic.
66+
- Kafka QuestDB connector which reads changes from the Kafka topic and feeds them to a [QuestDB](https://questdb.io) table.
67+
- QuestDB SQL console which is used to query the QuestDB table.
68+
- [Grafana](https://grafana.com/) which is used to visualize the data in the QuestDB table.
69+
70+
Debezium is open source project which provides connectors for various databases. It is used to capture changes from a database and feed them to a Kafka topic. In other words: Whenever there is a change in a database table, Debezium will read the change and feed it to a Kafka topic. This way in translates operations such as INSERT or UPDATE into events which can be consumed by other systems. Debezium supports a wide range of databases. In this sample we use the Postgres connector. Debezium is technically implemented as a Kafka Connect source.
71+
72+
For every change in the Postgres table the Debezium emits a JSON message to a Kafka topic. Messages look like this:
73+
```json
74+
{
75+
"schema": {
76+
"comment": "this contains Debezium message schema, it's not very relevant for this sample"
77+
},
78+
"payload": {
79+
"before": null,
80+
"after": {
81+
"id": 8,
82+
"symbol": "NFLX",
83+
"price": 1544.3357414199545,
84+
"last_update": 1666172978269856
85+
},
86+
"source": {
87+
"version": "1.9.6.Final",
88+
"connector": "postgresql",
89+
"name": "dbserver1",
90+
"ts_ms": 1666172978272,
91+
"snapshot": "false",
92+
"db": "postgres",
93+
"sequence": "[\"87397208\",\"87397208\"]",
94+
"schema": "public",
95+
"table": "stock",
96+
"txId": 402087,
97+
"lsn": 87397208,
98+
"xmin": null
99+
},
100+
"op": "u",
101+
"ts_ms": 1666172978637,
102+
"transaction": null
103+
}
104+
}
105+
```
106+
You can see the `payload` field contains the actual change. Let's zoom it a bit a focus on this part of the JSON:
107+
```json
108+
[...]
109+
"after": {
110+
"id": 8,
111+
"symbol": "NFLX",
112+
"price": 1544.3357414199545,
113+
"last_update": 1666172978269856
114+
},
115+
[...]
116+
```
117+
This is the actual change in a table. It's a JSON object which contains the new values for the columns in the Postgres table. Notice has the structure maps to the Postgres table schema described above.
1118
2119
3120
Postgres -> Kafka:

kafka-questdb-connector-samples/stocks/src/main/resources/schema.sql

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ create table if not exists stock (
44
price float8,
55
last_update timestamp
66
);
7-
insert into stock (symbol, price, last_update) values ('AAPL', 500.0, now());
8-
insert into stock (symbol, price, last_update) values ('IBM', 50.0, now());
9-
insert into stock (symbol, price, last_update) values ('MSFT', 100.0, now());
10-
insert into stock (symbol, price, last_update) values ('GOOG', 1000.0, now());
11-
insert into stock (symbol, price, last_update) values ('FB', 200.0, now());
12-
insert into stock (symbol, price, last_update) values ('AMZN', 1000.0, now());
13-
insert into stock (symbol, price, last_update) values ('TSLA', 500.0, now());
14-
insert into stock (symbol, price, last_update) values ('NFLX', 500.0, now());
15-
insert into stock (symbol, price, last_update) values ('TWTR', 50.0, now());
16-
insert into stock (symbol, price, last_update) values ('SNAP', 10.0, now());
7+
insert into stock (symbol, price, last_update) values ('AAPL', 500.0, now()) ON CONFLICT DO NOTHING;
8+
insert into stock (symbol, price, last_update) values ('IBM', 50.0, now()) ON CONFLICT DO NOTHING;
9+
insert into stock (symbol, price, last_update) values ('MSFT', 100.0, now()) ON CONFLICT DO NOTHING;
10+
insert into stock (symbol, price, last_update) values ('GOOG', 1000.0, now()) ON CONFLICT DO NOTHING;
11+
insert into stock (symbol, price, last_update) values ('FB', 200.0, now()) ON CONFLICT DO NOTHING;
12+
insert into stock (symbol, price, last_update) values ('AMZN', 1000.0, now()) ON CONFLICT DO NOTHING;
13+
insert into stock (symbol, price, last_update) values ('TSLA', 500.0, now()) ON CONFLICT DO NOTHING;
14+
insert into stock (symbol, price, last_update) values ('NFLX', 500.0, now()) ON CONFLICT DO NOTHING;
15+
insert into stock (symbol, price, last_update) values ('TWTR', 50.0, now()) ON CONFLICT DO NOTHING;
16+
insert into stock (symbol, price, last_update) values ('SNAP', 10.0, now()) ON CONFLICT DO NOTHING;

0 commit comments

Comments
 (0)