|
10 | 10 | import io.questdb.kafka.JarResolverExtension; |
11 | 11 | import org.apache.kafka.connect.json.JsonConverter; |
12 | 12 | import org.junit.jupiter.api.AfterEach; |
| 13 | +import org.junit.jupiter.api.Assertions; |
13 | 14 | import org.junit.jupiter.api.BeforeAll; |
14 | 15 | import org.junit.jupiter.api.Test; |
15 | 16 | import org.junit.jupiter.api.extension.RegisterExtension; |
|
27 | 28 |
|
28 | 29 | import java.sql.Connection; |
29 | 30 | import java.sql.DriverManager; |
| 31 | +import java.sql.PreparedStatement; |
30 | 32 | import java.sql.ResultSet; |
31 | 33 | import java.sql.SQLException; |
32 | 34 | import java.sql.Statement; |
| 35 | +import java.util.concurrent.ThreadLocalRandom; |
33 | 36 | import java.util.stream.Stream; |
34 | 37 |
|
35 | 38 | import static org.hamcrest.MatcherAssert.assertThat; |
| 39 | +import static org.junit.Assert.assertEquals; |
36 | 40 |
|
37 | 41 | @Testcontainers |
38 | 42 | public class DebeziumIT { |
@@ -132,6 +136,68 @@ public void testSmoke() throws Exception { |
132 | 136 | } |
133 | 137 | } |
134 | 138 |
|
| 139 | + @Test |
| 140 | + public void testManyUpdates() throws Exception { |
| 141 | + String questTableName = "test_many_updates"; |
| 142 | + try (Connection connection = getConnection(postgresContainer); |
| 143 | + Statement statement = connection.createStatement()) { |
| 144 | + startDebeziumConnector(); |
| 145 | + ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); |
| 146 | + questSink = questSink.with("transforms.unwrap.add.fields", "source.ts_ms") |
| 147 | + .with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "__source_ts_ms"); |
| 148 | + questSink.with(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "symbol"); |
| 149 | + debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); |
| 150 | + |
| 151 | + statement.execute("create schema " + PG_SCHEMA_NAME); |
| 152 | + statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, symbol varchar(255), price double precision, primary key (id))"); |
| 153 | + statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (0, 'TDB', 1.0)"); |
| 154 | + statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'QDB', 1.0)"); |
| 155 | + statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'IDB', 1.0)"); |
| 156 | + statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (3, 'PDB', 1.0)"); |
| 157 | + statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (4, 'KDB', 1.0)"); |
| 158 | + |
| 159 | + QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"price\"\r\n" |
| 160 | + + "0,\"TDB\",1.0\r\n" |
| 161 | + + "1,\"QDB\",1.0\r\n" |
| 162 | + + "2,\"IDB\",1.0\r\n" |
| 163 | + + "3,\"PDB\",1.0\r\n" |
| 164 | + + "4,\"KDB\",1.0\r\n", |
| 165 | + "select id, symbol, price from " + questTableName); |
| 166 | + |
| 167 | + try (PreparedStatement preparedStatement = connection.prepareStatement("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set price = ? where id = ?")) { |
| 168 | + //a bunch of updates |
| 169 | + for (int i = 0; i < 200_000; i++) { |
| 170 | + int id = ThreadLocalRandom.current().nextInt(5); |
| 171 | + double newPrice = ThreadLocalRandom.current().nextDouble(100); |
| 172 | + preparedStatement.setDouble(1, newPrice); |
| 173 | + preparedStatement.setInt(2, id); |
| 174 | + preparedStatement.addBatch(); |
| 175 | + } |
| 176 | + preparedStatement.executeBatch(); |
| 177 | + // set all prices to a known value, this will be useful in asserting the final state |
| 178 | + for (int i = 0; i < 5; i++) { |
| 179 | + preparedStatement.setDouble(1, 42.0); |
| 180 | + preparedStatement.setInt(2, i); |
| 181 | + Assertions.assertEquals(1, preparedStatement.executeUpdate()); |
| 182 | + } |
| 183 | + } |
| 184 | + |
| 185 | + // all symbols have the last well-known price |
| 186 | + QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"last_price\"\r\n" |
| 187 | + + "0,\"TDB\",42.0\r\n" |
| 188 | + + "1,\"QDB\",42.0\r\n" |
| 189 | + + "2,\"IDB\",42.0\r\n" |
| 190 | + + "3,\"PDB\",42.0\r\n" |
| 191 | + + "4,\"KDB\",42.0\r\n", |
| 192 | + "select id, symbol, last(price) as last_price from " + questTableName); |
| 193 | + |
| 194 | + // total number of rows is equal to the number of updates / inserts |
| 195 | + QuestDBUtils.assertSqlEventually(questDBContainer, "\"count\"\r\n" |
| 196 | + + "200010\r\n", |
| 197 | + "select count() from " + questTableName); |
| 198 | + } |
| 199 | + } |
| 200 | + |
135 | 201 | @Test |
136 | 202 | public void testSchemaChange() throws Exception { |
137 | 203 | String questTableName = "test_schema_change"; |
|
0 commit comments