Skip to content

Commit 9739b58

Browse files
committed
wip: Debezium sample project
1 parent 4a5a9cb commit 9739b58

File tree

15 files changed

+390
-16
lines changed

15 files changed

+390
-16
lines changed
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ COPY ./integration-tests/cp-server/pom.xml /opt/integration-tests/cp-server/pom.
77
COPY ./integration-tests/commons/pom.xml /opt/integration-tests/commons/pom.xml
88
COPY ./integration-tests/debezium/pom.xml /opt/integration-tests/debezium/pom.xml
99
COPY ./kafka-questdb-connector-samples/pom.xml /opt/kafka-questdb-connector-samples/pom.xml
10+
COPY ./kafka-questdb-connector-samples/stocks/pom.xml /opt/kafka-questdb-connector-samples/stocks/pom.xml
1011
RUN mvn -B -f ./pom.xml -pl connector dependency:go-offline
1112

1213
FROM dep-cache AS builder
@@ -15,5 +16,5 @@ COPY ./connector/src /opt/connector/src
1516
RUN mvn install -pl connector -Dmaven.test.skip
1617
RUN cd /opt/connector/target && tar xzvf ./kafka-questdb-connector-*-bin.tar.gz
1718

18-
FROM confluentinc/cp-kafka-connect:7.2.1
19-
COPY --from=builder /opt/connector/target/kafka-questdb-connector/* /usr/share/java/kafka/
19+
FROM debezium/connect:1.9.6.Final
20+
COPY --from=builder /opt/connector/target/kafka-questdb-connector/* /kafka/connect/questdb-connector/

kafka-questdb-connector-samples/faker/docker-compose.yml

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ services:
3939
- kafka:kafka
4040
- questdb:questdb
4141
connect:
42-
image: kafka-questdb-connector-samples-faker-connect
42+
image: kafka-questdb-connector-samples-connect
4343
build:
44-
dockerfile: ./Dockerfile-Sample-Faker
44+
dockerfile: ./Dockerfile-Samples
4545
context: ./../../
4646
ports:
4747
- "8083:8083"
@@ -50,15 +50,7 @@ services:
5050
links:
5151
- kafka:kafka
5252
environment:
53-
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
54-
CONNECT_GROUP_ID: "default"
55-
CONNECT_OFFSET_STORAGE_TOPIC: "connect-storage-topic"
56-
CONNECT_CONFIG_STORAGE_TOPIC: "connect-config-topic"
57-
CONNECT_STATUS_STORAGE_TOPIC: "connect-status-topic"
58-
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
59-
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
60-
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
61-
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
62-
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
63-
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
64-
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
53+
BOOTSTRAP_SERVERS: "kafka:9092"
54+
CONFIG_STORAGE_TOPIC: "debezium_connect_config"
55+
OFFSET_STORAGE_TOPIC: "debezium_connect_offsets"
56+
STATUS_STORAGE_TOPIC: "debezium_connect_status"

kafka-questdb-connector-samples/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@
1111

1212
<artifactId>kafka-questdb-connector-samples</artifactId>
1313
<packaging>pom</packaging>
14+
15+
<modules>
16+
<module>stocks</module>
17+
</modules>
1418
</project>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
FROM maven:3.8-jdk-11-slim AS builder
2+
COPY ./pom.xml /opt/stocks/pom.xml
3+
COPY ./src ./opt/stocks/src
4+
WORKDIR /opt/stocks
5+
RUN mvn clean install -DskipTests
6+
7+
FROM azul/zulu-openjdk:11-latest
8+
COPY --from=builder /opt/stocks/target/kafka-samples-stocks-*.jar /stocks.jar
9+
CMD ["java", "-jar", "/stocks.jar"]
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
version: '2.1'
2+
services:
3+
questdb:
4+
image: questdb/questdb:6.5.3
5+
expose:
6+
- "9009"
7+
ports:
8+
- "19000:9000"
9+
environment:
10+
- QDB_CAIRO_COMMIT_LAG=1000
11+
- JAVA_OPTS=-Djava.locale.providers=JRE,SPI
12+
- QDB_LINE_DEFAULT_PARTITION_BY=YEAR
13+
zookeeper:
14+
image: zookeeper:3.6.2
15+
ports:
16+
- "2181:2181"
17+
kafka:
18+
image: wurstmeister/kafka:2.13-2.8.1
19+
ports:
20+
- "9092:9092"
21+
depends_on:
22+
- zookeeper
23+
links:
24+
- zookeeper:zookeeper
25+
environment:
26+
KAFKA_ADVERTISED_HOST_NAME: "kafka"
27+
KAFKA_ADVERTISED_PORT: "9092"
28+
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
29+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
30+
KAFKA_CREATE_TOPICS: "dbserver1.public.stock:1:1"
31+
postgres:
32+
image: debezium/postgres:14-alpine
33+
ports:
34+
- "5432:5432"
35+
environment:
36+
POSTGRES_USER: "postgres"
37+
POSTGRES_PASSWORD: "postgres"
38+
POSTGRES_DB: "postgres"
39+
healthcheck:
40+
test: [ "CMD-SHELL", "pg_isready -d postgres -U postgres" ]
41+
interval: 10s
42+
timeout: 5s
43+
retries: 5
44+
producer:
45+
image: kafka-questdb-connector-samples-stocks-generator
46+
build:
47+
dockerfile: ./Dockerfile
48+
context: .
49+
depends_on:
50+
postgres:
51+
condition: service_healthy
52+
links:
53+
- postgres:postgres
54+
connect:
55+
image: kafka-questdb-connector-samples-connect
56+
build:
57+
dockerfile: ./Dockerfile-Samples
58+
context: ./../../
59+
ports:
60+
- "8083:8083"
61+
depends_on:
62+
- kafka
63+
links:
64+
- kafka:kafka
65+
environment:
66+
BOOTSTRAP_SERVERS: "kafka:9092"
67+
CONFIG_STORAGE_TOPIC: "debezium_connect_config"
68+
OFFSET_STORAGE_TOPIC: "debezium_connect_offsets"
69+
STATUS_STORAGE_TOPIC: "debezium_connect_status"
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
6+
<groupId>org.questdb</groupId>
7+
<modelVersion>4.0.0</modelVersion>
8+
<version>0.1-SNAPSHOT</version>
9+
10+
<artifactId>kafka-samples-stocks</artifactId>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<maven.compiler.source>8</maven.compiler.source>
15+
<maven.compiler.target>8</maven.compiler.target>
16+
<spring.boot.version>2.7.4</spring.boot.version>
17+
<testcontainers.version>1.17.3</testcontainers.version>
18+
19+
</properties>
20+
21+
<dependencyManagement>
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.springframework.boot</groupId>
25+
<artifactId>spring-boot-dependencies</artifactId>
26+
<version>${spring.boot.version}</version>
27+
<type>pom</type>
28+
<scope>import</scope>
29+
</dependency>
30+
</dependencies>
31+
</dependencyManagement>
32+
33+
34+
<dependencies>
35+
<dependency>
36+
<groupId>org.springframework</groupId>
37+
<artifactId>spring-test</artifactId>
38+
<version>5.3.23</version>
39+
<scope>test</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.springframework.boot</groupId>
43+
<artifactId>spring-boot-starter-jdbc</artifactId>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.springframework.data</groupId>
47+
<artifactId>spring-data-jdbc</artifactId>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.springframework.boot</groupId>
51+
<artifactId>spring-boot-test</artifactId>
52+
<scope>test</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.postgresql</groupId>
56+
<artifactId>postgresql</artifactId>
57+
<version>42.5.0</version>
58+
</dependency>
59+
<dependency>
60+
<groupId>org.testcontainers</groupId>
61+
<artifactId>testcontainers</artifactId>
62+
<scope>test</scope>
63+
<version>${testcontainers.version}</version>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.testcontainers</groupId>
67+
<artifactId>postgresql</artifactId>
68+
<scope>test</scope>
69+
<version>${testcontainers.version}</version>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.testcontainers</groupId>
73+
<artifactId>junit-jupiter</artifactId>
74+
<scope>test</scope>
75+
<version>${testcontainers.version}</version>
76+
</dependency>
77+
</dependencies>
78+
79+
<build>
80+
<plugins>
81+
<plugin>
82+
<groupId>org.springframework.boot</groupId>
83+
<artifactId>spring-boot-maven-plugin</artifactId>
84+
<configuration>
85+
<mainClass>io.questdb.kafka.samples.StocksApp</mainClass>
86+
</configuration>
87+
<executions>
88+
<execution>
89+
<goals>
90+
<goal>repackage</goal>
91+
</goals>
92+
</execution>
93+
</executions>
94+
</plugin>
95+
</plugins>
96+
</build>
97+
98+
</project>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Postgres -> Kafka:
2+
```shell
3+
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
4+
```
5+
6+
Kafka -> QuestDB
7+
```shell
8+
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.fields": "source.ts_ms", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "__source_ts_ms"}}' localhost:8083/connectors
9+
```
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.questdb.kafka.samples;
2+
3+
import org.springframework.data.annotation.Id;
4+
5+
public class Stock {
6+
private @Id Long id;
7+
private String symbol;
8+
private double price;
9+
10+
public String getSymbol() {
11+
return symbol;
12+
}
13+
14+
public void setSymbol(String symbol) {
15+
this.symbol = symbol;
16+
}
17+
18+
public double getPrice() {
19+
return price;
20+
}
21+
22+
public void setPrice(double price) {
23+
this.price = price;
24+
}
25+
26+
public Long getId() {
27+
return id;
28+
}
29+
30+
public void setId(long id) {
31+
this.id = id;
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return "Stock{" +
37+
"id=" + id +
38+
", symbol='" + symbol + '\'' +
39+
", price=" + price +
40+
'}';
41+
}
42+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.questdb.kafka.samples;
2+
3+
4+
import org.springframework.data.jdbc.core.JdbcAggregateOperations;
5+
import org.springframework.data.jdbc.core.convert.JdbcConverter;
6+
import org.springframework.data.jdbc.repository.query.Modifying;
7+
import org.springframework.data.jdbc.repository.query.Query;
8+
import org.springframework.data.jdbc.repository.support.SimpleJdbcRepository;
9+
import org.springframework.data.mapping.PersistentEntity;
10+
import org.springframework.data.repository.CrudRepository;
11+
import org.springframework.data.repository.query.Param;
12+
import org.springframework.stereotype.Repository;
13+
14+
import java.util.List;
15+
16+
@Repository
17+
public interface StockRepository extends CrudRepository<Stock, String> {
18+
19+
@Modifying
20+
@Query("UPDATE stock SET price = price * :factor WHERE symbol = :symbol")
21+
boolean updateBySymbol(@Param("symbol") String symbol, @Param("factor") double factor);
22+
23+
@Query("SELECT symbol FROM stock")
24+
List<String> findAllSymbols();
25+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.questdb.kafka.samples;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.scheduling.annotation.Scheduled;
5+
import org.springframework.stereotype.Service;
6+
7+
import javax.annotation.PostConstruct;
8+
import java.util.Random;
9+
10+
@Service
11+
public class StockService {
12+
@Autowired
13+
private StockRepository stockRepository;
14+
private String[] allSymbols;
15+
private final Random random = new Random();
16+
17+
@PostConstruct
18+
public void init() {
19+
this.allSymbols = stockRepository.findAllSymbols().toArray(new String[0]);
20+
}
21+
22+
@Scheduled(fixedRate = 1)
23+
public void tick() {
24+
for (String symbol : allSymbols) {
25+
double factor = 1 + (random.nextGaussian() / 100);
26+
stockRepository.updateBySymbol(symbol, factor);
27+
// stockRepository.updateBySymbol(symbol, 1.0 + Math.random() * 0.1);
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)