Skip to content

Commit c8a77f1

Browse files
committed
improve testSchemaChange()
alter the schema after making sure rows with the original schema made it through
1 parent 00bf651 commit c8a77f1

File tree

1 file changed

+19
-13
lines changed

1 file changed

+19
-13
lines changed

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,15 @@ public void testSmoke() throws Exception {
118118
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");
119119
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");
120120
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium')");
121-
122-
ConnectorConfiguration debeziumSourceConfig = ConnectorConfiguration
123-
.forJdbcContainer(postgresContainer)
124-
.with("database.server.name", PG_SERVER_NAME);
125-
debeziumContainer.registerConnector(DEBEZIUM_CONNECTOR_NAME, debeziumSourceConfig);
121+
startDebeziumConnector();
126122

127123
ConnectorConfiguration questSinkConfig = newQuestSinkBaseConfig(questTableName);
128124
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSinkConfig);
129125

130126
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
131127
+ "1,\"Learn CDC\"\r\n"
132-
+ "2,\"Learn Debezium\"\r\n", "select id, title from " + questTableName);
128+
+ "2,\"Learn Debezium\"\r\n",
129+
"select id, title from " + questTableName);
133130
}
134131
}
135132

@@ -143,24 +140,33 @@ public void testSchemaChange() throws Exception {
143140
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");
144141
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");
145142
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");
146-
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " add column description varchar(255)");
147-
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', 'Best book ever')");
148143

149-
ConnectorConfiguration connector = ConnectorConfiguration
150-
.forJdbcContainer(postgresContainer)
151-
.with("database.server.name", PG_SERVER_NAME);
152-
153-
debeziumContainer.registerConnector(DEBEZIUM_CONNECTOR_NAME, connector);
144+
startDebeziumConnector();
145+
154146
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
155147
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
156148

149+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"
150+
+ "1,\"Learn CDC\"\r\n",
151+
"select id, title from " + questTableName);
152+
153+
statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " add column description varchar(255)");
154+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', 'Best book ever')");
155+
157156
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"description\"\r\n"
158157
+ "1,\"Learn CDC\",\r\n"
159158
+ "2,\"Learn Debezium\",\"Best book ever\"\r\n",
160159
"select id, title, description from " + questTableName);
161160
}
162161
}
163162

163+
private static void startDebeziumConnector() {
164+
ConnectorConfiguration connector = ConnectorConfiguration
165+
.forJdbcContainer(postgresContainer)
166+
.with("database.server.name", PG_SERVER_NAME);
167+
debeziumContainer.registerConnector(DEBEZIUM_CONNECTOR_NAME, connector);
168+
}
169+
164170

165171
private static Connection getConnection(
166172
PostgreSQLContainer<?> postgresContainer)

0 commit comments

Comments
 (0)