Skip to content

Commit 00bf651

Browse files
committed
improve debezium tests
1 parent 8208464 commit 00bf651

File tree

2 files changed

+70
-41
lines changed

2 files changed

+70
-41
lines changed

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void setUp() {
7676
}
7777

7878
@AfterEach
79-
public void tearDown() throws IOException {
79+
public void tearDown() {
8080
connect.stop();
8181
}
8282

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,11 @@
88
import io.questdb.kafka.QuestDBSinkTask;
99
import io.questdb.kafka.QuestDBUtils;
1010
import io.questdb.kafka.JarResolverExtension;
11-
import org.apache.kafka.clients.consumer.ConsumerConfig;
12-
import org.apache.kafka.clients.consumer.ConsumerRecord;
13-
import org.apache.kafka.clients.consumer.KafkaConsumer;
14-
import org.apache.kafka.common.serialization.StringDeserializer;
1511
import org.apache.kafka.connect.json.JsonConverter;
12+
import org.junit.jupiter.api.AfterEach;
1613
import org.junit.jupiter.api.BeforeAll;
1714
import org.junit.jupiter.api.Test;
1815
import org.junit.jupiter.api.extension.RegisterExtension;
19-
import org.rnorth.ducttape.unreliables.Unreliables;
2016
import org.slf4j.LoggerFactory;
2117
import org.testcontainers.containers.GenericContainer;
2218
import org.testcontainers.containers.KafkaContainer;
@@ -33,20 +29,17 @@
3329
import java.sql.DriverManager;
3430
import java.sql.SQLException;
3531
import java.sql.Statement;
36-
import java.sql.Time;
37-
import java.time.Duration;
38-
import java.util.ArrayList;
39-
import java.util.Arrays;
40-
import java.util.List;
41-
import java.util.Properties;
42-
import java.util.UUID;
43-
import java.util.concurrent.TimeUnit;
4432
import java.util.stream.Stream;
4533

4634
import static org.hamcrest.MatcherAssert.assertThat;
4735

4836
@Testcontainers
4937
public class DebeziumIT {
38+
private static final String PG_SCHEMA_NAME = "test";
39+
private static final String PG_TABLE_NAME = "test";
40+
private static final String PG_SERVER_NAME = "dbserver1";
41+
private static final String DEBEZIUM_CONNECTOR_NAME = "debezium_source";
42+
private static final String QUESTDB_CONNECTOR_NAME = "questdb_sink";
5043

5144
// we need to locate JARs with QuestDB client and Kafka Connect Connector,
5245
// this is later used to copy to the Kafka Connect container
@@ -90,49 +83,85 @@ public static void startContainers() {
9083
.join();
9184
}
9285

86+
@AfterEach
87+
public void cleanup() throws SQLException {
88+
debeziumContainer.deleteAllConnectors();
89+
try (Connection connection = getConnection(postgresContainer);
90+
Statement statement = connection.createStatement()) {
91+
statement.execute("drop schema " + PG_SCHEMA_NAME + " CASCADE");
92+
}
93+
}
94+
95+
private static ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) {
96+
ConnectorConfiguration questSink = ConnectorConfiguration.create()
97+
.with("connector.class", QuestDBSinkConnector.class.getName())
98+
.with("host", questDBContainer.getNetworkAliases().get(0))
99+
.with("tasks.max", "1")
100+
.with("topics", PG_SERVER_NAME + "."+ PG_SCHEMA_NAME + "." + PG_TABLE_NAME)
101+
.with(QuestDBSinkConnectorConfig.TABLE_CONFIG, questTableName)
102+
.with("key.converter", JsonConverter.class.getName())
103+
.with("value.converter", JsonConverter.class.getName())
104+
.with("transforms", "unwrap")
105+
.with("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
106+
.with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
107+
return questSink;
108+
}
109+
93110
@Test
94111
public void testSmoke() throws Exception {
95-
String tableName = "todo";
112+
String questTableName = "test_smoke";
96113
try (Connection connection = getConnection(postgresContainer);
97114
Statement statement = connection.createStatement()) {
98115

99-
statement.execute("create schema todo");
100-
statement.execute("create table todo.Todo (id int8 not null, " +
101-
"title varchar(255), primary key (id))");
102-
statement.execute("alter table todo.Todo replica identity full");
103-
statement.execute("insert into todo.Todo values (1, " +
104-
"'Learn CDC')");
105-
statement.execute("insert into todo.Todo values (2, " +
106-
"'Learn Debezium')");
116+
statement.execute("create schema " + PG_SCHEMA_NAME);
117+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");
118+
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");
119+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");
120+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium')");
107121

108-
ConnectorConfiguration connector = ConnectorConfiguration
122+
ConnectorConfiguration debeziumSourceConfig = ConnectorConfiguration
109123
.forJdbcContainer(postgresContainer)
110-
.with("database.server.name", "dbserver1");
124+
.with("database.server.name", PG_SERVER_NAME);
125+
debeziumContainer.registerConnector(DEBEZIUM_CONNECTOR_NAME, debeziumSourceConfig);
126+
127+
ConnectorConfiguration questSinkConfig = newQuestSinkBaseConfig(questTableName);
128+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSinkConfig);
111129

130+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
131+
+ "1,\"Learn CDC\"\r\n"
132+
+ "2,\"Learn Debezium\"\r\n", "select id, title from " + questTableName);
133+
}
134+
}
112135

113-
debeziumContainer.registerConnector("my-connector", connector);
136+
@Test
137+
public void testSchemaChange() throws Exception {
138+
String questTableName = "test_schema_change";
139+
try (Connection connection = getConnection(postgresContainer);
140+
Statement statement = connection.createStatement()) {
114141

115-
ConnectorConfiguration questSink = ConnectorConfiguration.create()
116-
.with("connector.class", QuestDBSinkConnector.class.getName())
117-
.with("host", questDBContainer.getNetworkAliases().get(0))
118-
.with("tasks.max", "1")
119-
.with("topics", "dbserver1.todo.todo")
120-
.with(QuestDBSinkConnectorConfig.TABLE_CONFIG, tableName)
121-
.with("key.converter", JsonConverter.class.getName())
122-
.with("value.converter", JsonConverter.class.getName())
123-
.with("transforms", "unwrap")
124-
.with("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
125-
.with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
142+
statement.execute("create schema " + PG_SCHEMA_NAME);
143+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");
144+
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");
145+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");
146+
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " add column description varchar(255)");
147+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', 'Best book ever')");
126148

127-
debeziumContainer.registerConnector("questdb-sink", questSink);
149+
ConnectorConfiguration connector = ConnectorConfiguration
150+
.forJdbcContainer(postgresContainer)
151+
.with("database.server.name", PG_SERVER_NAME);
128152

153+
debeziumContainer.registerConnector(DEBEZIUM_CONNECTOR_NAME, connector);
154+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
155+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
129156

130-
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
131-
+ "1,\"Learn CDC\"\r\n"
132-
+ "2,\"Learn Debezium\"\r\n", "select id, title from " + tableName);
157+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"description\"\r\n"
158+
+ "1,\"Learn CDC\",\r\n"
159+
+ "2,\"Learn Debezium\",\"Best book ever\"\r\n",
160+
"select id, title, description from " + questTableName);
133161
}
134162
}
135163

164+
136165
private static Connection getConnection(
137166
PostgreSQLContainer<?> postgresContainer)
138167
throws SQLException {

0 commit comments

Comments
 (0)