-
Notifications
You must be signed in to change notification settings - Fork 716
Expand file tree
/
Copy pathKafkaContainer.java
More file actions
69 lines (60 loc) · 2.45 KB
/
KafkaContainer.java
File metadata and controls
69 lines (60 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.kafka.clients;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.InternetProtocol;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import static org.testcontainers.utility.DockerImageName.parse;
final class KafkaContainer extends GenericContainer<KafkaContainer> {
static final Logger LOGGER = LoggerFactory.getLogger(KafkaContainer.class);
static final int KAFKA_PORT = 19092;
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.6.0"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
withLogConsumer(new Slf4jLogConsumer(LOGGER));
}
String bootstrapServer() {
return getHost() + ":" + getMappedPort(KAFKA_PORT);
}
KafkaProducer<String, String> createStringProducer() {
return new KafkaProducer<>(producerConfig(), new StringSerializer(), new StringSerializer());
}
Properties producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer());
props.put("acks", "1");
props.put("batch.size", "10");
props.put("client.id", "kafka-extension");
props.put("request.timeout.ms", "500");
return props;
}
KafkaConsumer<String, String> createStringConsumer() {
return new KafkaConsumer<>(consumerConfig(), new StringDeserializer(),
new StringDeserializer());
}
Properties consumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer());
props.put("group.id", "kafka-extension");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "100");
props.put("session.timeout.ms", "200");
props.put("fetch.max.wait.ms", "200");
props.put("metadata.max.age.ms", "100");
return props;
}
}