Skip to content

Commit 4a398a4

Browse files
committed
[chore] add Avro integration test
1 parent f131e1f commit 4a398a4

File tree

5 files changed

+347
-0
lines changed

5 files changed

+347
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-integration-tests</artifactId>
7+
<groupId>org.questdb</groupId>
8+
<version>0.3-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>kafka-it-avro-schema-registry</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>11</maven.compiler.source>
16+
<maven.compiler.target>11</maven.compiler.target>
17+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18+
</properties>
19+
<dependencies>
20+
<dependency>
21+
<groupId>io.confluent</groupId>
22+
<artifactId>kafka-avro-serializer</artifactId>
23+
<version>7.2.2</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.apache.avro</groupId>
27+
<artifactId>avro</artifactId>
28+
<version>1.11.1</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.questdb</groupId>
32+
<artifactId>kafka-questdb-connector</artifactId>
33+
<version>${project.version}</version>
34+
<scope>test</scope>
35+
<classifier>tests</classifier>
36+
</dependency>
37+
<dependency>
38+
<groupId>org.questdb</groupId>
39+
<artifactId>kafka-questdb-connector</artifactId>
40+
<version>${project.version}</version>
41+
<scope>test</scope>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.kafka</groupId>
45+
<artifactId>connect-api</artifactId>
46+
<version>${kafka.version}</version>
47+
<scope>test</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.testcontainers</groupId>
51+
<artifactId>testcontainers</artifactId>
52+
<scope>test</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.testcontainers</groupId>
56+
<artifactId>kafka</artifactId>
57+
<scope>test</scope>
58+
</dependency>
59+
<dependency>
60+
<groupId>org.testcontainers</groupId>
61+
<artifactId>junit-jupiter</artifactId>
62+
<scope>test</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>io.debezium</groupId>
66+
<artifactId>debezium-testing-testcontainers</artifactId>
67+
<version>1.9.5.Final</version>
68+
<scope>test</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.slf4j</groupId>
72+
<artifactId>slf4j-api</artifactId>
73+
<scope>provided</scope>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.slf4j</groupId>
77+
<artifactId>slf4j-log4j12</artifactId>
78+
<scope>test</scope>
79+
</dependency>
80+
<dependency>
81+
<groupId>org.questdb</groupId>
82+
<artifactId>kafka-it-common</artifactId>
83+
<version>${project.version}</version>
84+
</dependency>
85+
</dependencies>
86+
87+
<build>
88+
<plugins>
89+
<plugin>
90+
<groupId>org.apache.avro</groupId>
91+
<artifactId>avro-maven-plugin</artifactId>
92+
<version>1.11.1</version>
93+
<executions>
94+
<execution>
95+
<phase>generate-sources</phase>
96+
<goals>
97+
<goal>schema</goal>
98+
</goals>
99+
<configuration>
100+
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
101+
<outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
102+
<imports>
103+
<import>${project.basedir}/src/main/resources/avro/Student.avsc</import>
104+
</imports>
105+
</configuration>
106+
</execution>
107+
</executions>
108+
</plugin>
109+
</plugins>
110+
</build>
111+
112+
<repositories>
113+
<repository>
114+
<id>confluent</id>
115+
<name>Confluent</name>
116+
<url>https://packages.confluent.io/maven/</url>
117+
</repository>
118+
</repositories>
119+
120+
</project>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"namespace": "io.questdb.kafka.domain",
3+
"type": "record",
4+
"name": "Student",
5+
"version": 1,
6+
"fields": [
7+
{
8+
"name": "firstname",
9+
"type": "string"
10+
},
11+
{
12+
"name": "lastname",
13+
"type": "string"
14+
},
15+
{
16+
"name": "birthday",
17+
"type": [
18+
"null",
19+
{
20+
"type": "long",
21+
"logicalType": "timestamp-millis"
22+
}
23+
]
24+
},
25+
{
26+
"name": "active",
27+
"type": ["null", "boolean"],
28+
"default": null
29+
}
30+
]
31+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"namespace": "io.questdb.kafka.domain",
3+
"type": "record",
4+
"name": "Student",
5+
"fields": [
6+
{
7+
"name": "firstname",
8+
"type": "string"
9+
},
10+
{
11+
"name": "lastname",
12+
"type": "string"
13+
},
14+
{
15+
"name": "birthday",
16+
"type": [
17+
"null",
18+
{
19+
"type": "long",
20+
"logicalType": "timestamp-millis"
21+
}
22+
]
23+
}
24+
]
25+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package io.questdb.kafka;
2+
3+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
4+
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
5+
import io.debezium.testing.testcontainers.ConnectorConfiguration;
6+
import io.debezium.testing.testcontainers.DebeziumContainer;
7+
import io.questdb.client.Sender;
8+
import io.questdb.kafka.domain.Student;
9+
import org.apache.avro.Schema;
10+
import org.apache.avro.generic.GenericData;
11+
import org.apache.avro.generic.GenericRecord;
12+
import org.apache.kafka.clients.producer.KafkaProducer;
13+
import org.apache.kafka.clients.producer.Producer;
14+
import org.apache.kafka.clients.producer.ProducerConfig;
15+
import org.apache.kafka.clients.producer.ProducerRecord;
16+
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.jetbrains.annotations.NotNull;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.api.extension.RegisterExtension;
20+
import org.slf4j.LoggerFactory;
21+
import org.testcontainers.containers.GenericContainer;
22+
import org.testcontainers.containers.KafkaContainer;
23+
import org.testcontainers.containers.Network;
24+
import org.testcontainers.containers.output.Slf4jLogConsumer;
25+
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
26+
import org.testcontainers.containers.wait.strategy.Wait;
27+
import org.testcontainers.junit.jupiter.Container;
28+
import org.testcontainers.junit.jupiter.Testcontainers;
29+
import org.testcontainers.utility.DockerImageName;
30+
import org.testcontainers.utility.MountableFile;
31+
32+
import java.time.Instant;
33+
import java.util.Properties;
34+
35+
import static java.time.Duration.ofMinutes;
36+
37+
@Testcontainers
38+
public class AvroSchemaRegistryIT {
39+
// we need to locate JARs with QuestDB client and Kafka Connect Connector,
40+
// this is later used to copy to the Kafka Connect container
41+
@RegisterExtension
42+
public static JarResolverExtension connectorJarResolver = JarResolverExtension.forClass(QuestDBSinkTask.class);
43+
@RegisterExtension
44+
public static JarResolverExtension questdbJarResolver = JarResolverExtension.forClass(Sender.class);
45+
46+
private final static Network network = Network.newNetwork();
47+
48+
@Container
49+
private final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.0"))
50+
.withNetwork(network);
51+
52+
@Container
53+
private final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3")
54+
.withNetwork(network)
55+
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
56+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
57+
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
58+
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
59+
60+
@Container
61+
private final DebeziumContainer connectContainer = new DebeziumContainer("confluentinc/cp-kafka-connect:7.2.1")
62+
.withEnv("CONNECT_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092")
63+
.withEnv("CONNECT_GROUP_ID", "test")
64+
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic")
65+
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic")
66+
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic")
67+
.withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
68+
.withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter")
69+
.withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false")
70+
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect")
71+
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
72+
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
73+
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
74+
.withNetwork(network)
75+
.withExposedPorts(8083)
76+
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar")
77+
.withCopyFileToContainer(MountableFile.forHostPath(questdbJarResolver.getJarPath()), "/usr/share/java/kafka/questdb.jar")
78+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("connect")))
79+
.dependsOn(kafkaContainer, questDBContainer)
80+
.waitingFor(new HttpWaitStrategy()
81+
.forPath("/connectors")
82+
.forStatusCode(200)
83+
.forPort(8083)
84+
.withStartupTimeout(ofMinutes(5)));
85+
86+
@Container
87+
private GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-schema-registry:7.2.2"))
88+
.withNetwork(network)
89+
.withNetworkAliases("schema-registry")
90+
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092")
91+
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost")
92+
.withExposedPorts(8081)
93+
.dependsOn(kafkaContainer)
94+
.waitingFor(Wait.forHttp("/subjects"));
95+
96+
@Test
97+
public void testSmoke() throws Exception {
98+
String topicName = "mytopic";
99+
try (Producer<String, Student> producer = new KafkaProducer<>(producerProps())) {
100+
Student student = Student.newBuilder()
101+
.setFirstname("John")
102+
.setLastname("Doe")
103+
.setBirthday(Instant.parse("2000-01-01T00:00:00Z"))
104+
.build();
105+
producer.send(new ProducerRecord<>(topicName, "foo", student)).get();
106+
}
107+
108+
startConnector(topicName);
109+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n"
110+
+ "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n",
111+
"select * from " + topicName);
112+
}
113+
114+
@Test
115+
public void testSchemaEvolution() throws Exception {
116+
String topicName = "mytopic";
117+
try (Producer<String, Student> producer = new KafkaProducer<>(producerProps())) {
118+
Student student = Student.newBuilder()
119+
.setFirstname("John")
120+
.setLastname("Doe")
121+
.setBirthday(Instant.parse("2000-01-01T00:00:00Z"))
122+
.build();
123+
producer.send(new ProducerRecord<>(topicName, "foo", student)).get();
124+
}
125+
startConnector(topicName);
126+
127+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n"
128+
+ "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n",
129+
"select * from " + topicName);
130+
131+
try (Producer<String, GenericRecord> producer = new KafkaProducer<>(producerProps())) {
132+
Schema schema = new org.apache.avro.Schema.Parser().parse(getClass().getResourceAsStream("/avro-runtime/StudentWithExtraColumn.avsc"));
133+
GenericRecord student = new GenericData.Record(schema);
134+
student.put("firstname", "Mary");
135+
student.put("lastname", "Doe");
136+
student.put("birthday", Instant.parse("2005-01-01T00:00:00Z").toEpochMilli());
137+
student.put("active", true);
138+
producer.send(new ProducerRecord<>(topicName, "foo", student)).get();
139+
}
140+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\",\"active\"\r\n"
141+
+ "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\",false\r\n"
142+
+ "\"Mary\",\"Doe\",\"2005-01-01T00:00:00.000000Z\",true\r\n",
143+
"select * from " + topicName);
144+
}
145+
146+
private void startConnector(String topicName) {
147+
ConnectorConfiguration connector = ConnectorConfiguration.create()
148+
.with("connector.class", QuestDBSinkConnector.class.getName())
149+
.with("tasks.max", "1")
150+
.with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
151+
.with("value.converter", "io.confluent.connect.avro.AvroConverter")
152+
.with("value.converter.schema.registry.url", "http://" + schemaRegistry.getNetworkAliases().get(0) + ":8081")
153+
.with("topics", topicName)
154+
.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birthday")
155+
.with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false")
156+
.with("host", questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_ILP_PORT);
157+
connectContainer.registerConnector("my-connector", connector);
158+
}
159+
160+
@NotNull
161+
private Properties producerProps() {
162+
Properties props = new Properties();
163+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
164+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
165+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
166+
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
167+
"http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getFirstMappedPort());
168+
return props;
169+
}
170+
}

integration-tests/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<module>cp-server</module>
1313
<module>debezium</module>
1414
<module>commons</module>
15+
<module>avro-schema-registry</module>
1516
</modules>
1617

1718
<packaging>pom</packaging>

0 commit comments

Comments
 (0)