Skip to content

Commit cd65047

Browse files
committed
more tests
1 parent 1bf7483 commit cd65047

File tree

3 files changed

+89
-5
lines changed

3 files changed

+89
-5
lines changed

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,35 @@ public void testSmoke() {
114114
"select firstname,lastname,age from " + topicName);
115115
}
116116

117+
@Test
118+
public void testUpfrontTable() {
119+
connect.kafka().createTopic(topicName, 1);
120+
Map<String, String> props = baseConnectorProps(topicName);
121+
connect.configureConnector(CONNECTOR_NAME, props);
122+
assertConnectorTaskRunningEventually();
123+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
124+
.field("firstname", Schema.STRING_SCHEMA)
125+
.field("lastname", Schema.STRING_SCHEMA)
126+
.field("age", Schema.INT8_SCHEMA)
127+
.build();
128+
129+
Struct struct = new Struct(schema)
130+
.put("firstname", "John")
131+
.put("lastname", "Doe")
132+
.put("age", (byte) 42);
133+
134+
QuestDBUtils.assertSql(questDBContainer,
135+
"{\"ddl\":\"OK\"}\n",
136+
"create table " + topicName + " (firstname string, lastname string, age int)",
137+
QuestDBUtils.Endpoint.EXEC);
138+
139+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
140+
141+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n"
142+
+ "\"John\",\"Doe\",42,\"key\"\r\n",
143+
"select * from " + topicName);
144+
}
145+
117146
@Test
118147
public void testDesignatedTimestamp_noSchema_unixEpochMillis() {
119148
connect.kafka().createTopic(topicName, 1);

connector/src/test/java/io/questdb/kafka/QuestDBUtils.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,43 @@
11
package io.questdb.kafka;
22

3+
import okhttp3.ConnectionSpec;
34
import okhttp3.OkHttpClient;
45
import okhttp3.Request;
56
import okhttp3.Response;
67
import org.testcontainers.containers.GenericContainer;
78

89
import java.io.IOException;
910
import java.net.URLEncoder;
11+
import java.util.Collections;
1012
import java.util.concurrent.TimeUnit;
1113

1214
import static org.awaitility.Awaitility.await;
1315
import static org.junit.jupiter.api.Assertions.assertEquals;
1416
import static org.junit.jupiter.api.Assertions.fail;
1517

1618
public final class QuestDBUtils {
19+
public enum Endpoint {
20+
EXPORT("exp"),
21+
EXEC("exec");
22+
23+
private String endpoint;
24+
25+
Endpoint(String endpoint) {
26+
this.endpoint = endpoint;
27+
}
28+
29+
30+
String getEndpoint() {
31+
return endpoint;
32+
}
33+
}
34+
35+
1736
public static final int QUESTDB_ILP_PORT = 9009;
1837
public static final int QUESTDB_HTTP_PORT = 9000;
1938

2039
private static final int QUERY_WAITING_TIME_SECONDS = 30;
21-
private static final OkHttpClient CLIENT = new OkHttpClient();
40+
private static final OkHttpClient CLIENT = new OkHttpClient(new OkHttpClient.Builder().connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT)));
2241

2342
private QuestDBUtils() {
2443

@@ -28,8 +47,12 @@ public static void assertSqlEventually(GenericContainer<?> questdbContainer, Str
2847
await().atMost(QUERY_WAITING_TIME_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertSql(questdbContainer, expectedResult, query));
2948
}
3049

31-
private static void assertSql(GenericContainer<?> questdbContainer, String expectedResult, String query) throws IOException {
32-
try (Response response = executeQuery(questdbContainer, query)) {
50+
public static void assertSql(GenericContainer<?> questdbContainer, String expectedResult, String query) {
51+
assertSql(questdbContainer, expectedResult, query, Endpoint.EXPORT);
52+
}
53+
54+
public static void assertSql(GenericContainer<?> questdbContainer, String expectedResult, String query, Endpoint endpoint) {
55+
try (Response response = executeQuery(questdbContainer, query, endpoint)) {
3356
if (response.code() != 200) {
3457
fail("Query failed, returned code " + response.code());
3558
}
@@ -39,14 +62,16 @@ private static void assertSql(GenericContainer<?> questdbContainer, String expec
3962
assertEquals(expectedResult, bodyString);
4063
}
4164
}
65+
} catch (IOException e) {
66+
fail("Query failed", e);
4267
}
4368
}
4469

45-
private static Response executeQuery(GenericContainer<?> questContainer, String query) throws IOException {
70+
private static Response executeQuery(GenericContainer<?> questContainer, String query, Endpoint endpoint) throws IOException {
4671
String encodedQuery = URLEncoder.encode(query, "UTF-8");
4772
String baseUrl = "http://" + questContainer.getHost() + ":" + questContainer.getMappedPort(QUESTDB_HTTP_PORT);
4873
Request request = new Request.Builder()
49-
.url(baseUrl + "/exp?query=" + encodedQuery)
74+
.url(baseUrl + "/" + endpoint.endpoint + "?query=" + encodedQuery)
5075
.build();
5176
return CLIENT.newCall(request).execute();
5277
}

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,36 @@ public void testDate() throws SQLException {
322322
}
323323
}
324324

325+
@Test
326+
public void testDelete() throws SQLException {
327+
String questTableName = "test_delete";
328+
try (Connection connection = getConnection(postgresContainer);
329+
Statement statement = connection.createStatement()) {
330+
startDebeziumConnector();
331+
332+
statement.execute("create schema " + PG_SCHEMA_NAME);
333+
statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp, primary key (id))");
334+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02')");
335+
336+
ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);
337+
questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");
338+
debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);
339+
340+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"
341+
+ "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n",
342+
"select * from " + questTableName);
343+
344+
// delete should be ignored by QuestDB
345+
statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1");
346+
statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', '2021-01-03')");
347+
348+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"
349+
+ "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n"
350+
+ "2,\"Learn Debezium\",\"2021-01-03T00:00:00.000000Z\"\r\n",
351+
"select * from " + questTableName);
352+
}
353+
}
354+
325355
private static void startDebeziumConnector() {
326356
ConnectorConfiguration connector = ConnectorConfiguration
327357
.forJdbcContainer(postgresContainer)

0 commit comments

Comments
 (0)