Skip to content

Commit 289435f

Browse files
committed
wip: debezium integration test
1 parent 5032304 commit 289435f

File tree

11 files changed

+323
-26
lines changed

11 files changed

+323
-26
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ private void handleObject(String name, Schema schema, Object value, String fallb
113113
timestampColumnValue = value;
114114
return;
115115
}
116+
if (value == null) {
117+
return;
118+
}
116119
if (tryWriteLogicalType(name.isEmpty() ? fallbackName : name, schema, value)) {
117120
return;
118121
}
@@ -170,29 +173,21 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec
170173
case INT16:
171174
case INT32:
172175
case INT64:
173-
if (value != null) {
174-
Number l = (Number) value;
175-
sender.longColumn(sanitizedName, l.longValue());
176-
}
176+
Number l = (Number) value;
177+
sender.longColumn(sanitizedName, l.longValue());
177178
break;
178179
case FLOAT32:
179180
case FLOAT64:
180-
if (value != null) {
181-
Number d = (Number) value;
182-
sender.doubleColumn(sanitizedName, d.doubleValue());
183-
}
181+
Number d = (Number) value;
182+
sender.doubleColumn(sanitizedName, d.doubleValue());
184183
break;
185184
case BOOLEAN:
186-
if (value != null) {
187-
Boolean b = (Boolean) value;
188-
sender.boolColumn(sanitizedName, b);
189-
}
185+
Boolean b = (Boolean) value;
186+
sender.boolColumn(sanitizedName, b);
190187
break;
191188
case STRING:
192-
if (value != null) {
193-
String s = (String) value;
194-
sender.stringColumn(sanitizedName, s);
195-
}
189+
String s = (String) value;
190+
sender.stringColumn(sanitizedName, s);
196191
break;
197192
case STRUCT:
198193
handleStruct(name, (Struct) value, schema);

connector/src/test/java/io/questdb/kafka/QuestDBUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private QuestDBUtils() {
2424

2525
}
2626

27-
static void assertSqlEventually(GenericContainer<?> questdbContainer, String expectedResult, String query) {
27+
public static void assertSqlEventually(GenericContainer<?> questdbContainer, String expectedResult, String query) {
2828
await().atMost(QUERY_WAITING_TIME_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertSql(questdbContainer, expectedResult, query));
2929
}
3030

integration-tests/commons/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-integration-tests</artifactId>
7+
<groupId>org.questdb</groupId>
8+
<version>0.1-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>kafka-it-common</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>8</maven.compiler.source>
16+
<maven.compiler.target>8</maven.compiler.target>
17+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.testcontainers</groupId>
23+
<artifactId>junit-jupiter</artifactId>
24+
</dependency>
25+
</dependencies>
26+
</project>

integration-tests/cp-server/src/test/java/io/questdb/kafka/extension/JarResolverExtension.java renamed to integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package io.questdb.kafka.extension;
1+
package io.questdb.kafka;
22

3-
import io.questdb.client.Sender;
43
import org.junit.jupiter.api.extension.AfterAllCallback;
54
import org.junit.jupiter.api.extension.Extension;
65
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -46,7 +45,7 @@ public String getJarPath() {
4645
}
4746

4847
private static String getPathToJarWithClass(Class<?> clazz) {
49-
URL resource = Sender.class.getClassLoader().getResource(clazz.getName().replace(".", "/") + ".class");
48+
URL resource = clazz.getClassLoader().getResource(clazz.getName().replace(".", "/") + ".class");
5049
String stringPath = Objects.requireNonNull(resource, "class " + clazz + " not found").getPath();
5150
stringPath = stringPath.substring("file:".length(), stringPath.indexOf("!"));
5251
Path path = Paths.get(stringPath);

integration-tests/cp-server/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<groupId>org.apache.kafka</groupId>
2929
<artifactId>connect-api</artifactId>
3030
<version>${kafka.version}</version>
31-
<scope>provided</scope>
31+
<scope>test</scope>
3232
</dependency>
3333
<dependency>
3434
<groupId>org.testcontainers</groupId>
@@ -61,6 +61,11 @@
6161
<artifactId>slf4j-log4j12</artifactId>
6262
<scope>test</scope>
6363
</dependency>
64+
<dependency>
65+
<groupId>org.questdb</groupId>
66+
<artifactId>kafka-it-common</artifactId>
67+
<version>${project.version}</version>
68+
</dependency>
6469
</dependencies>
6570

6671
</project>

integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@
88
import org.apache.kafka.clients.producer.ProducerConfig;
99
import org.apache.kafka.clients.producer.ProducerRecord;
1010
import org.apache.kafka.common.serialization.StringSerializer;
11-
import org.apache.kafka.connect.storage.StringConverter;
1211
import org.junit.jupiter.api.Test;
1312
import org.junit.jupiter.api.extension.RegisterExtension;
14-
import io.questdb.kafka.extension.JarResolverExtension;
1513
import org.slf4j.LoggerFactory;
1614
import org.testcontainers.containers.GenericContainer;
1715
import org.testcontainers.containers.KafkaContainer;
@@ -92,8 +90,8 @@ public void test() throws Exception {
9290
ConnectorConfiguration connector = ConnectorConfiguration.create()
9391
.with("connector.class", QuestDBSinkConnector.class.getName())
9492
.with("tasks.max", "1")
95-
.with("key.converter", StringConverter.class.getName())
96-
.with("value.converter", StringConverter.class.getName())
93+
.with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
94+
.with("value.converter", "org.apache.kafka.connect.storage.StringConverter")
9795
.with("topics", topicName)
9896
.with("host", questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_ILP_PORT);
9997

integration-tests/debezium/pom.xml

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-integration-tests</artifactId>
7+
<groupId>org.questdb</groupId>
8+
<version>0.1-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>kafka-it-debezium</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>8</maven.compiler.source>
16+
<maven.compiler.target>8</maven.compiler.target>
17+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18+
<debezium.version>1.9.6.Final</debezium.version>
19+
</properties>
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>io.debezium</groupId>
24+
<artifactId>debezium-testing-testcontainers</artifactId>
25+
<version>${debezium.version}</version>
26+
<scope>test</scope>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.questdb</groupId>
30+
<artifactId>kafka-questdb-connector</artifactId>
31+
<version>${project.version}</version>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.questdb</groupId>
36+
<artifactId>kafka-questdb-connector</artifactId>
37+
<version>0.1-SNAPSHOT</version>
38+
<scope>test</scope>
39+
<classifier>tests</classifier>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.testcontainers</groupId>
43+
<artifactId>kafka</artifactId>
44+
<scope>test</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.testcontainers</groupId>
48+
<artifactId>postgresql</artifactId>
49+
<scope>test</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.slf4j</groupId>
53+
<artifactId>slf4j-api</artifactId>
54+
<scope>provided</scope>
55+
</dependency>
56+
<dependency>
57+
<groupId>org.slf4j</groupId>
58+
<artifactId>slf4j-log4j12</artifactId>
59+
<scope>test</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>io.debezium</groupId>
63+
<artifactId>debezium-testing-testcontainers</artifactId>
64+
<version>1.9.5.Final</version>
65+
<scope>test</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.apache.kafka</groupId>
69+
<artifactId>connect-json</artifactId>
70+
<scope>test</scope>
71+
<version>${kafka.version}</version>
72+
</dependency>
73+
<dependency>
74+
<groupId>com.jayway.jsonpath</groupId>
75+
<artifactId>json-path</artifactId>
76+
<version>2.7.0</version>
77+
</dependency>
78+
<dependency>
79+
<groupId>org.postgresql</groupId>
80+
<artifactId>postgresql</artifactId>
81+
<version>42.4.2</version>
82+
</dependency>
83+
<dependency>
84+
<groupId>org.testcontainers</groupId>
85+
<artifactId>junit-jupiter</artifactId>
86+
<scope>test</scope>
87+
</dependency>
88+
<dependency>
89+
<groupId>org.questdb</groupId>
90+
<artifactId>kafka-it-common</artifactId>
91+
<version>${project.version}</version>
92+
</dependency>
93+
</dependencies>
94+
95+
</project>
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package kafka;
2+
3+
import io.debezium.testing.testcontainers.ConnectorConfiguration;
4+
import io.debezium.testing.testcontainers.DebeziumContainer;
5+
import io.questdb.client.Sender;
6+
import io.questdb.kafka.QuestDBSinkConnector;
7+
import io.questdb.kafka.QuestDBSinkConnectorConfig;
8+
import io.questdb.kafka.QuestDBSinkTask;
9+
import io.questdb.kafka.QuestDBUtils;
10+
import io.questdb.kafka.JarResolverExtension;
11+
import org.apache.kafka.clients.consumer.ConsumerConfig;
12+
import org.apache.kafka.clients.consumer.ConsumerRecord;
13+
import org.apache.kafka.clients.consumer.KafkaConsumer;
14+
import org.apache.kafka.common.serialization.StringDeserializer;
15+
import org.apache.kafka.connect.json.JsonConverter;
16+
import org.junit.jupiter.api.BeforeAll;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.RegisterExtension;
19+
import org.rnorth.ducttape.unreliables.Unreliables;
20+
import org.slf4j.LoggerFactory;
21+
import org.testcontainers.containers.GenericContainer;
22+
import org.testcontainers.containers.KafkaContainer;
23+
import org.testcontainers.containers.Network;
24+
import org.testcontainers.containers.PostgreSQLContainer;
25+
import org.testcontainers.containers.output.Slf4jLogConsumer;
26+
import org.testcontainers.junit.jupiter.Container;
27+
import org.testcontainers.junit.jupiter.Testcontainers;
28+
import org.testcontainers.lifecycle.Startables;
29+
import org.testcontainers.utility.DockerImageName;
30+
import org.testcontainers.utility.MountableFile;
31+
32+
import java.sql.Connection;
33+
import java.sql.DriverManager;
34+
import java.sql.SQLException;
35+
import java.sql.Statement;
36+
import java.sql.Time;
37+
import java.time.Duration;
38+
import java.util.ArrayList;
39+
import java.util.Arrays;
40+
import java.util.List;
41+
import java.util.Properties;
42+
import java.util.UUID;
43+
import java.util.concurrent.TimeUnit;
44+
import java.util.stream.Stream;
45+
46+
import static org.hamcrest.MatcherAssert.assertThat;
47+
48+
@Testcontainers
49+
public class DebeziumIT {
50+
51+
// we need to locate JARs with QuestDB client and Kafka Connect Connector,
52+
// this is later used to copy to the Kafka Connect container
53+
@RegisterExtension
54+
public static JarResolverExtension connectorJarResolver = JarResolverExtension.forClass(QuestDBSinkTask.class);
55+
@RegisterExtension
56+
public static JarResolverExtension questdbJarResolver = JarResolverExtension.forClass(Sender.class);
57+
58+
private static Network network = Network.newNetwork();
59+
60+
private static KafkaContainer kafkaContainer = new KafkaContainer()
61+
.withNetwork(network);
62+
63+
public static PostgreSQLContainer<?> postgresContainer =
64+
new PostgreSQLContainer<>(DockerImageName.parse("debezium/postgres:11").asCompatibleSubstituteFor("postgres"))
65+
.withNetwork(network)
66+
.withNetworkAliases("postgres");
67+
68+
public static DebeziumContainer debeziumContainer =
69+
new DebeziumContainer("debezium/connect:1.9.6.Final")
70+
.withNetwork(network)
71+
.withKafka(kafkaContainer)
72+
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb-connector.jar")
73+
.withCopyFileToContainer(MountableFile.forHostPath(questdbJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb.jar")
74+
.dependsOn(kafkaContainer)
75+
.withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "true")
76+
.withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true");
77+
78+
@Container
79+
private static final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3")
80+
.withNetwork(network)
81+
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
82+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
83+
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
84+
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
85+
86+
@BeforeAll
87+
public static void startContainers() {
88+
Startables.deepStart(Stream.of(
89+
kafkaContainer, postgresContainer, debeziumContainer))
90+
.join();
91+
}
92+
93+
@Test
94+
public void testSmoke() throws Exception {
95+
String tableName = "todo";
96+
try (Connection connection = getConnection(postgresContainer);
97+
Statement statement = connection.createStatement()) {
98+
99+
statement.execute("create schema todo");
100+
statement.execute("create table todo.Todo (id int8 not null, " +
101+
"title varchar(255), primary key (id))");
102+
statement.execute("alter table todo.Todo replica identity full");
103+
statement.execute("insert into todo.Todo values (1, " +
104+
"'Learn CDC')");
105+
statement.execute("insert into todo.Todo values (2, " +
106+
"'Learn Debezium')");
107+
108+
ConnectorConfiguration connector = ConnectorConfiguration
109+
.forJdbcContainer(postgresContainer)
110+
.with("database.server.name", "dbserver1");
111+
112+
113+
debeziumContainer.registerConnector("my-connector", connector);
114+
115+
ConnectorConfiguration questSink = ConnectorConfiguration.create()
116+
.with("connector.class", QuestDBSinkConnector.class.getName())
117+
.with("host", questDBContainer.getNetworkAliases().get(0))
118+
.with("tasks.max", "1")
119+
.with("topics", "dbserver1.todo.todo")
120+
.with(QuestDBSinkConnectorConfig.TABLE_CONFIG, tableName)
121+
.with("key.converter", JsonConverter.class.getName())
122+
.with("value.converter", JsonConverter.class.getName())
123+
.with("transforms", "unwrap")
124+
.with("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
125+
.with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
126+
127+
debeziumContainer.registerConnector("questdb-sink", questSink);
128+
129+
130+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
131+
+ "1,\"Learn CDC\"\r\n"
132+
+ "2,\"Learn Debezium\"\r\n", "select id, title from " + tableName);
133+
}
134+
}
135+
136+
private static Connection getConnection(
137+
PostgreSQLContainer<?> postgresContainer)
138+
throws SQLException {
139+
140+
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
141+
postgresContainer.getUsername(),
142+
postgresContainer.getPassword());
143+
}
144+
}

0 commit comments

Comments
 (0)