commit
99c9168630
18
apache-kafka/README.md
Normal file
18
apache-kafka/README.md
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
## Apache Kafka
|
||||||
|
|
||||||
|
This module contains articles about Apache Kafka.
|
||||||
|
|
||||||
|
### Relevant articles
|
||||||
|
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
|
||||||
|
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
|
||||||
|
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
|
||||||
|
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
|
||||||
|
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
|
||||||
|
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
|
||||||
|
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
|
||||||
|
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
|
||||||
|
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
|
||||||
|
|
||||||
|
|
||||||
|
##### Building the project
|
||||||
|
You can build the project from the command line using: *mvn clean install*, or in an IDE.
|
180
apache-kafka/pom.xml
Normal file
180
apache-kafka/pom.xml
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>apache-kafka</artifactId>
|
||||||
|
<name>apache-kafka</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-modules</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
<version>${kafka.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-streams</artifactId>
|
||||||
|
<version>${kafka.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
<version>${org.slf4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<version>${org.slf4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-streaming-java_2.11</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-core</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-logging</artifactId>
|
||||||
|
<groupId>commons-logging</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-java</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>commons-logging</artifactId>
|
||||||
|
<groupId>commons-logging</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-test-utils_2.11</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
<artifactId>guava</artifactId>
|
||||||
|
<version>${guava.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility-proxy</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||||
|
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.assertj</groupId>
|
||||||
|
<artifactId>assertj-core</artifactId>
|
||||||
|
<version>${assertj.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>kafka</artifactId>
|
||||||
|
<version>${testcontainers-kafka.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>${testcontainers-jupiter.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-core_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-core.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-sql_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-core.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-graphx_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-core.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-streaming_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-core.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-mllib_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-core.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-core.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.datastax.spark</groupId>
|
||||||
|
<artifactId>spark-cassandra-connector_2.11</artifactId>
|
||||||
|
<version>${com.datastax.spark.spark-cassandra-connector.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.datastax.spark</groupId>
|
||||||
|
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
|
||||||
|
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<assertj.version>3.6.2</assertj.version>
|
||||||
|
<kafka.version>2.8.0</kafka.version>
|
||||||
|
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
|
||||||
|
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
|
||||||
|
<flink.version>1.5.0</flink.version>
|
||||||
|
<awaitility.version>3.0.0</awaitility.version>
|
||||||
|
<guava.version>29.0-jre</guava.version>
|
||||||
|
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
|
||||||
|
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
|
||||||
|
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
|
||||||
|
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
@ -0,0 +1,70 @@
|
|||||||
|
package com.baeldung.flink;
|
||||||
|
|
||||||
|
import com.baeldung.flink.model.Backup;
|
||||||
|
import com.baeldung.flink.model.InputMessage;
|
||||||
|
import com.baeldung.flink.operator.BackupAggregator;
|
||||||
|
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
|
||||||
|
import com.baeldung.flink.operator.WordsCapitalizer;
|
||||||
|
import org.apache.flink.streaming.api.TimeCharacteristic;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
|
||||||
|
|
||||||
|
import static com.baeldung.flink.connector.Consumers.*;
|
||||||
|
import static com.baeldung.flink.connector.Producers.*;
|
||||||
|
|
||||||
|
public class FlinkDataPipeline {
|
||||||
|
|
||||||
|
public static void capitalize() throws Exception {
|
||||||
|
String inputTopic = "flink_input";
|
||||||
|
String outputTopic = "flink_output";
|
||||||
|
String consumerGroup = "baeldung";
|
||||||
|
String address = "localhost:9092";
|
||||||
|
|
||||||
|
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
|
||||||
|
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
|
||||||
|
flinkKafkaConsumer.setStartFromEarliest();
|
||||||
|
|
||||||
|
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
|
||||||
|
|
||||||
|
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
|
||||||
|
|
||||||
|
stringInputStream.map(new WordsCapitalizer())
|
||||||
|
.addSink(flinkKafkaProducer);
|
||||||
|
|
||||||
|
environment.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void createBackup() throws Exception {
|
||||||
|
String inputTopic = "flink_input";
|
||||||
|
String outputTopic = "flink_output";
|
||||||
|
String consumerGroup = "baeldung";
|
||||||
|
String kafkaAddress = "localhost:9092";
|
||||||
|
|
||||||
|
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
|
||||||
|
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
|
||||||
|
|
||||||
|
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
|
||||||
|
flinkKafkaConsumer.setStartFromEarliest();
|
||||||
|
|
||||||
|
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
|
||||||
|
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
|
||||||
|
|
||||||
|
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
|
||||||
|
|
||||||
|
inputMessagesStream.timeWindowAll(Time.hours(24))
|
||||||
|
.aggregate(new BackupAggregator())
|
||||||
|
.addSink(flinkKafkaProducer);
|
||||||
|
|
||||||
|
environment.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
createBackup();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -9,23 +9,20 @@ import java.util.Properties;
|
|||||||
|
|
||||||
public class Consumers {
|
public class Consumers {
|
||||||
|
|
||||||
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
|
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
|
||||||
String topic, String kafkaAddress, String kafkaGroup ) {
|
Properties props = new Properties();
|
||||||
Properties props = new Properties();
|
props.setProperty("bootstrap.servers", kafkaAddress);
|
||||||
props.setProperty("bootstrap.servers", kafkaAddress);
|
props.setProperty("group.id", kafkaGroup);
|
||||||
props.setProperty("group.id",kafkaGroup);
|
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
|
||||||
FlinkKafkaConsumer011<String> consumer =
|
|
||||||
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
|
|
||||||
|
|
||||||
return consumer;
|
return consumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
|
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.setProperty("bootstrap.servers", kafkaAddress);
|
properties.setProperty("bootstrap.servers", kafkaAddress);
|
||||||
properties.setProperty("group.id",kafkaGroup);
|
properties.setProperty("group.id", kafkaGroup);
|
||||||
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
|
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
|
||||||
topic, new InputMessageDeserializationSchema(),properties);
|
|
||||||
|
|
||||||
return consumer;
|
return consumer;
|
||||||
}
|
}
|
@ -18,6 +18,7 @@ public class InputMessage {
|
|||||||
public String getSender() {
|
public String getSender() {
|
||||||
return sender;
|
return sender;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSender(String sender) {
|
public void setSender(String sender) {
|
||||||
this.sender = sender;
|
this.sender = sender;
|
||||||
}
|
}
|
||||||
@ -55,12 +56,14 @@ public class InputMessage {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) return true;
|
if (this == o)
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
return true;
|
||||||
|
if (o == null || getClass() != o.getClass())
|
||||||
|
return false;
|
||||||
InputMessage message1 = (InputMessage) o;
|
InputMessage message1 = (InputMessage) o;
|
||||||
return Objects.equal(sender, message1.sender) &&
|
return Objects.equal(sender, message1.sender) &&
|
||||||
Objects.equal(recipient, message1.recipient) &&
|
Objects.equal(recipient, message1.recipient) &&
|
||||||
Objects.equal(sentAt, message1.sentAt) &&
|
Objects.equal(sentAt, message1.sentAt) &&
|
||||||
Objects.equal(message, message1.message);
|
Objects.equal(message, message1.message);
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,34 @@
|
|||||||
|
package com.baeldung.flink.operator;
|
||||||
|
|
||||||
|
import com.baeldung.flink.model.Backup;
|
||||||
|
import com.baeldung.flink.model.InputMessage;
|
||||||
|
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
|
||||||
|
@Override
|
||||||
|
public List<InputMessage> createAccumulator() {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
|
||||||
|
inputMessages.add(inputMessage);
|
||||||
|
return inputMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Backup getResult(List<InputMessage> inputMessages) {
|
||||||
|
Backup backup = new Backup(inputMessages, LocalDateTime.now());
|
||||||
|
return backup;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
|
||||||
|
inputMessages.addAll(acc1);
|
||||||
|
return inputMessages;
|
||||||
|
}
|
||||||
|
}
|
@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
|
|||||||
@Override
|
@Override
|
||||||
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
|
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
|
||||||
ZoneId zoneId = ZoneId.systemDefault();
|
ZoneId zoneId = ZoneId.systemDefault();
|
||||||
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
|
return element.getSentAt()
|
||||||
|
.atZone(zoneId)
|
||||||
|
.toEpochSecond() * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
@ -9,8 +9,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class BackupSerializationSchema
|
public class BackupSerializationSchema implements SerializationSchema<Backup> {
|
||||||
implements SerializationSchema<Backup> {
|
|
||||||
|
|
||||||
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||||
|
|
||||||
@ -18,7 +17,7 @@ public class BackupSerializationSchema
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] serialize(Backup backupMessage) {
|
public byte[] serialize(Backup backupMessage) {
|
||||||
if(objectMapper == null) {
|
if (objectMapper == null) {
|
||||||
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
|
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
|
||||||
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||||
}
|
}
|
@ -8,12 +8,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
public class InputMessageDeserializationSchema implements
|
public class InputMessageDeserializationSchema implements DeserializationSchema<InputMessage> {
|
||||||
DeserializationSchema<InputMessage> {
|
|
||||||
|
|
||||||
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InputMessage deserialize(byte[] bytes) throws IOException {
|
public InputMessage deserialize(byte[] bytes) throws IOException {
|
||||||
|
|
@ -27,11 +27,11 @@ public class KafkaTopicApplication {
|
|||||||
short replicationFactor = 1;
|
short replicationFactor = 1;
|
||||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
||||||
|
|
||||||
CreateTopicsResult result = admin.createTopics(
|
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
|
||||||
Collections.singleton(newTopic));
|
|
||||||
|
|
||||||
// get the async result for the new topic creation
|
// get the async result for the new topic creation
|
||||||
KafkaFuture<Void> future = result.values().get(topicName);
|
KafkaFuture<Void> future = result.values()
|
||||||
|
.get(topicName);
|
||||||
|
|
||||||
// call get() to block until topic creation has completed or failed
|
// call get() to block until topic creation has completed or failed
|
||||||
future.get();
|
future.get();
|
||||||
@ -47,15 +47,13 @@ public class KafkaTopicApplication {
|
|||||||
short replicationFactor = 1;
|
short replicationFactor = 1;
|
||||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
||||||
|
|
||||||
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
|
CreateTopicsOptions topicOptions = new CreateTopicsOptions().validateOnly(true)
|
||||||
.validateOnly(true)
|
.retryOnQuotaViolation(true);
|
||||||
.retryOnQuotaViolation(true);
|
|
||||||
|
|
||||||
CreateTopicsResult result = admin.createTopics(
|
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic), topicOptions);
|
||||||
Collections.singleton(newTopic), topicOptions
|
|
||||||
);
|
|
||||||
|
|
||||||
KafkaFuture<Void> future = result.values().get(topicName);
|
KafkaFuture<Void> future = result.values()
|
||||||
|
.get(topicName);
|
||||||
future.get();
|
future.get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -72,14 +70,12 @@ public class KafkaTopicApplication {
|
|||||||
Map<String, String> newTopicConfig = new HashMap<>();
|
Map<String, String> newTopicConfig = new HashMap<>();
|
||||||
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
|
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
|
||||||
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
||||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
|
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor).configs(newTopicConfig);
|
||||||
.configs(newTopicConfig);
|
|
||||||
|
|
||||||
CreateTopicsResult result = admin.createTopics(
|
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
|
||||||
Collections.singleton(newTopic)
|
|
||||||
);
|
|
||||||
|
|
||||||
KafkaFuture<Void> future = result.values().get(topicName);
|
KafkaFuture<Void> future = result.values()
|
||||||
|
.get(topicName);
|
||||||
future.get();
|
future.get();
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -19,9 +19,7 @@ public class CountryPopulationConsumer {
|
|||||||
private java.util.function.Consumer<Throwable> exceptionConsumer;
|
private java.util.function.Consumer<Throwable> exceptionConsumer;
|
||||||
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
|
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
|
||||||
|
|
||||||
public CountryPopulationConsumer(
|
public CountryPopulationConsumer(Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer, java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
|
||||||
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
|
|
||||||
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
|
|
||||||
this.consumer = consumer;
|
this.consumer = consumer;
|
||||||
this.exceptionConsumer = exceptionConsumer;
|
this.exceptionConsumer = exceptionConsumer;
|
||||||
this.countryPopulationConsumer = countryPopulationConsumer;
|
this.countryPopulationConsumer = countryPopulationConsumer;
|
@ -1,4 +1,4 @@
|
|||||||
package com.baeldung.kafka;
|
package com.baeldung.kafka.exactlyonce;
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
@ -24,16 +24,16 @@ public class TransactionalMessageProducer {
|
|||||||
|
|
||||||
producer.initTransactions();
|
producer.initTransactions();
|
||||||
|
|
||||||
try{
|
try {
|
||||||
|
|
||||||
producer.beginTransaction();
|
producer.beginTransaction();
|
||||||
|
|
||||||
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
|
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2)
|
||||||
new ProducerRecord<String, String>("input", null, s)));
|
.forEach(s -> producer.send(new ProducerRecord<String, String>("input", null, s)));
|
||||||
|
|
||||||
producer.commitTransaction();
|
producer.commitTransaction();
|
||||||
|
|
||||||
}catch (KafkaException e){
|
} catch (KafkaException e) {
|
||||||
|
|
||||||
producer.abortTransaction();
|
producer.abortTransaction();
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package com.baeldung.kafka;
|
package com.baeldung.kafka.exactlyonce;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
@ -43,10 +43,11 @@ public class TransactionalWordCount {
|
|||||||
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
|
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
|
||||||
|
|
||||||
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
|
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(record -> Stream.of(record.value().split(" ")))
|
.flatMap(record -> Stream.of(record.value()
|
||||||
.map(word -> Tuple.of(word, 1))
|
.split(" ")))
|
||||||
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
|
.map(word -> Tuple.of(word, 1))
|
||||||
|
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
|
||||||
|
|
||||||
producer.beginTransaction();
|
producer.beginTransaction();
|
||||||
|
|
||||||
@ -56,7 +57,8 @@ public class TransactionalWordCount {
|
|||||||
|
|
||||||
for (TopicPartition partition : records.partitions()) {
|
for (TopicPartition partition : records.partitions()) {
|
||||||
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
|
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
|
||||||
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
|
long offset = partitionedRecords.get(partitionedRecords.size() - 1)
|
||||||
|
.offset();
|
||||||
|
|
||||||
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
|
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
|
||||||
}
|
}
|
||||||
@ -72,7 +74,6 @@ public class TransactionalWordCount {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static KafkaConsumer<String, String> createKafkaConsumer() {
|
private static KafkaConsumer<String, String> createKafkaConsumer() {
|
@ -1,4 +1,4 @@
|
|||||||
package com.baeldung.kafka;
|
package com.baeldung.kafka.exactlyonce;
|
||||||
|
|
||||||
public class Tuple {
|
public class Tuple {
|
||||||
|
|
||||||
@ -10,8 +10,8 @@ public class Tuple {
|
|||||||
this.value = value;
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Tuple of(String key, Integer value){
|
public static Tuple of(String key, Integer value) {
|
||||||
return new Tuple(key,value);
|
return new Tuple(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getKey() {
|
public String getKey() {
|
@ -15,8 +15,7 @@ public class KafkaProducer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Future<RecordMetadata> send(String key, String value) {
|
public Future<RecordMetadata> send(String key, String value) {
|
||||||
ProducerRecord record = new ProducerRecord("topic_sports_news",
|
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
|
||||||
key, value);
|
|
||||||
return producer.send(record);
|
return producer.send(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,5 +35,4 @@ public class KafkaProducer {
|
|||||||
producer.commitTransaction();
|
producer.commitTransaction();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package com.baeldung.kafka.streams;
|
package com.baeldung.kafka.streamsvsconsumer;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
@ -1,61 +1,78 @@
|
|||||||
package com.baeldung.kafkastreams;
|
package com.baeldung.kafkastreams;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import java.io.IOException;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
import java.io.UncheckedIOException;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import java.nio.file.Files;
|
||||||
import org.apache.kafka.streams.KafkaStreams;
|
import java.nio.file.Path;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
|
||||||
import org.apache.kafka.streams.kstream.KStream;
|
|
||||||
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
|
||||||
import org.apache.kafka.streams.kstream.KTable;
|
|
||||||
import org.apache.kafka.test.TestUtils;
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
|
import org.apache.kafka.streams.Topology;
|
||||||
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
|
import org.apache.kafka.streams.kstream.KTable;
|
||||||
|
import org.apache.kafka.streams.kstream.Produced;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class KafkaStreamsLiveTest {
|
public class KafkaStreamsLiveTest {
|
||||||
private String bootstrapServers = "localhost:9092";
|
private String bootstrapServers = "localhost:9092";
|
||||||
|
private Path stateDirectory;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore("it needs to have kafka broker running on local")
|
@Ignore("it needs to have kafka broker running on local")
|
||||||
public void shouldTestKafkaStreams() throws InterruptedException {
|
public void shouldTestKafkaStreams() throws InterruptedException {
|
||||||
//given
|
// given
|
||||||
String inputTopic = "inputTopic";
|
String inputTopic = "inputTopic";
|
||||||
|
|
||||||
Properties streamsConfiguration = new Properties();
|
Properties streamsConfiguration = new Properties();
|
||||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
|
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
|
||||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
.getClass()
|
||||||
|
.getName());
|
||||||
|
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String()
|
||||||
|
.getClass()
|
||||||
|
.getName());
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
|
||||||
// Use a temporary directory for storing state, which will be automatically removed after the test.
|
// Use a temporary directory for storing state, which will be automatically removed after the test.
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
|
try {
|
||||||
|
this.stateDirectory = Files.createTempDirectory("kafka-streams");
|
||||||
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath()
|
||||||
|
.toString());
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new UncheckedIOException("Cannot create temporary directory", e);
|
||||||
|
}
|
||||||
|
|
||||||
//when
|
// when
|
||||||
KStreamBuilder builder = new KStreamBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
KStream<String, String> textLines = builder.stream(inputTopic);
|
KStream<String, String> textLines = builder.stream(inputTopic);
|
||||||
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
|
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
|
||||||
|
|
||||||
KTable<String, Long> wordCounts = textLines
|
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
|
||||||
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
|
.groupBy((key, word) -> word)
|
||||||
.groupBy((key, word) -> word)
|
.count();
|
||||||
.count();
|
|
||||||
|
|
||||||
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
|
wordCounts.toStream()
|
||||||
|
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
|
||||||
|
|
||||||
String outputTopic = "outputTopic";
|
String outputTopic = "outputTopic";
|
||||||
final Serde<String> stringSerde = Serdes.String();
|
|
||||||
final Serde<Long> longSerde = Serdes.Long();
|
|
||||||
wordCounts.to(stringSerde, longSerde, outputTopic);
|
|
||||||
|
|
||||||
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
|
wordCounts.toStream()
|
||||||
|
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
|
||||||
|
|
||||||
|
final Topology topology = builder.build();
|
||||||
|
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
|
||||||
streams.start();
|
streams.start();
|
||||||
|
|
||||||
//then
|
// then
|
||||||
Thread.sleep(30000);
|
Thread.sleep(30000);
|
||||||
streams.close();
|
streams.close();
|
||||||
}
|
}
|
@ -13,7 +13,6 @@ Remember, for advanced libraries like [Jackson](/jackson) and [JUnit](/testing-m
|
|||||||
- [Implementing a FTP-Client in Java](https://www.baeldung.com/java-ftp-client)
|
- [Implementing a FTP-Client in Java](https://www.baeldung.com/java-ftp-client)
|
||||||
- [Introduction to Functional Java](https://www.baeldung.com/java-functional-library)
|
- [Introduction to Functional Java](https://www.baeldung.com/java-functional-library)
|
||||||
- [A Guide to the Reflections Library](https://www.baeldung.com/reflections-library)
|
- [A Guide to the Reflections Library](https://www.baeldung.com/reflections-library)
|
||||||
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
|
|
||||||
- [Introduction to Protonpack](https://www.baeldung.com/java-protonpack)
|
- [Introduction to Protonpack](https://www.baeldung.com/java-protonpack)
|
||||||
- [Java-R Integration](https://www.baeldung.com/java-r-integration)
|
- [Java-R Integration](https://www.baeldung.com/java-r-integration)
|
||||||
- [Using libphonenumber to Validate Phone Numbers](https://www.baeldung.com/java-libphonenumber)
|
- [Using libphonenumber to Validate Phone Numbers](https://www.baeldung.com/java-libphonenumber)
|
||||||
|
@ -22,18 +22,6 @@
|
|||||||
<artifactId>protonpack</artifactId>
|
<artifactId>protonpack</artifactId>
|
||||||
<version>${protonpack.version}</version>
|
<version>${protonpack.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-streams</artifactId>
|
|
||||||
<version>${kafka.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-clients</artifactId>
|
|
||||||
<version>${kafka.version}</version>
|
|
||||||
<classifier>test</classifier>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.github.resilience4j</groupId>
|
<groupId>io.github.resilience4j</groupId>
|
||||||
<artifactId>resilience4j-circuitbreaker</artifactId>
|
<artifactId>resilience4j-circuitbreaker</artifactId>
|
||||||
@ -148,7 +136,6 @@
|
|||||||
</build>
|
</build>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<kafka.version>2.0.0</kafka.version>
|
|
||||||
<javapoet.version>1.10.0</javapoet.version>
|
<javapoet.version>1.10.0</javapoet.version>
|
||||||
<reflections.version>0.9.11</reflections.version>
|
<reflections.version>0.9.11</reflections.version>
|
||||||
<mockftpserver.version>2.7.1</mockftpserver.version>
|
<mockftpserver.version>2.7.1</mockftpserver.version>
|
||||||
|
@ -11,8 +11,6 @@ This module contains articles about libraries for data processing in Java.
|
|||||||
- [An Introduction to SuanShu](https://www.baeldung.com/suanshu)
|
- [An Introduction to SuanShu](https://www.baeldung.com/suanshu)
|
||||||
- [Intro to Derive4J](https://www.baeldung.com/derive4j)
|
- [Intro to Derive4J](https://www.baeldung.com/derive4j)
|
||||||
- [Univocity Parsers](https://www.baeldung.com/java-univocity-parsers)
|
- [Univocity Parsers](https://www.baeldung.com/java-univocity-parsers)
|
||||||
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
|
|
||||||
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
|
|
||||||
- More articles: [[<-- prev]](/../libraries-data)
|
- More articles: [[<-- prev]](/../libraries-data)
|
||||||
|
|
||||||
##### Building the project
|
##### Building the project
|
||||||
|
@ -116,11 +116,6 @@
|
|||||||
<artifactId>univocity-parsers</artifactId>
|
<artifactId>univocity-parsers</artifactId>
|
||||||
<version>${univocity.version}</version>
|
<version>${univocity.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-clients</artifactId>
|
|
||||||
<version>${kafka.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.guava</groupId>
|
<groupId>com.google.guava</groupId>
|
||||||
<artifactId>guava</artifactId>
|
<artifactId>guava</artifactId>
|
||||||
@ -144,13 +139,6 @@
|
|||||||
<version>${byte-buddy.version}</version>
|
<version>${byte-buddy.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-clients</artifactId>
|
|
||||||
<version>${kafka.version}</version>
|
|
||||||
<classifier>test</classifier>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
@ -176,7 +164,6 @@
|
|||||||
<slf4j.version>1.7.25</slf4j.version>
|
<slf4j.version>1.7.25</slf4j.version>
|
||||||
<awaitility.version>3.0.0</awaitility.version>
|
<awaitility.version>3.0.0</awaitility.version>
|
||||||
<univocity.version>2.8.4</univocity.version>
|
<univocity.version>2.8.4</univocity.version>
|
||||||
<kafka.version>2.5.0</kafka.version>
|
|
||||||
<guava.version>29.0-jre</guava.version>
|
<guava.version>29.0-jre</guava.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
@ -1,82 +0,0 @@
|
|||||||
package com.baeldung.flink;
|
|
||||||
|
|
||||||
|
|
||||||
import com.baeldung.flink.model.Backup;
|
|
||||||
import com.baeldung.flink.model.InputMessage;
|
|
||||||
import com.baeldung.flink.operator.BackupAggregator;
|
|
||||||
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
|
|
||||||
import com.baeldung.flink.operator.WordsCapitalizer;
|
|
||||||
import org.apache.flink.streaming.api.TimeCharacteristic;
|
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
||||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
|
||||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
|
|
||||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
|
|
||||||
|
|
||||||
import static com.baeldung.flink.connector.Consumers.*;
|
|
||||||
import static com.baeldung.flink.connector.Producers.*;
|
|
||||||
|
|
||||||
public class FlinkDataPipeline {
|
|
||||||
|
|
||||||
public static void capitalize() throws Exception {
|
|
||||||
String inputTopic = "flink_input";
|
|
||||||
String outputTopic = "flink_output";
|
|
||||||
String consumerGroup = "baeldung";
|
|
||||||
String address = "localhost:9092";
|
|
||||||
|
|
||||||
StreamExecutionEnvironment environment =
|
|
||||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
|
||||||
|
|
||||||
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
|
|
||||||
createStringConsumerForTopic(inputTopic, address, consumerGroup);
|
|
||||||
flinkKafkaConsumer.setStartFromEarliest();
|
|
||||||
|
|
||||||
DataStream<String> stringInputStream =
|
|
||||||
environment.addSource(flinkKafkaConsumer);
|
|
||||||
|
|
||||||
FlinkKafkaProducer011<String> flinkKafkaProducer =
|
|
||||||
createStringProducer(outputTopic, address);
|
|
||||||
|
|
||||||
stringInputStream
|
|
||||||
.map(new WordsCapitalizer())
|
|
||||||
.addSink(flinkKafkaProducer);
|
|
||||||
|
|
||||||
environment.execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void createBackup () throws Exception {
|
|
||||||
String inputTopic = "flink_input";
|
|
||||||
String outputTopic = "flink_output";
|
|
||||||
String consumerGroup = "baeldung";
|
|
||||||
String kafkaAddress = "localhost:9092";
|
|
||||||
|
|
||||||
StreamExecutionEnvironment environment =
|
|
||||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
|
||||||
|
|
||||||
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
|
|
||||||
|
|
||||||
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
|
|
||||||
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
|
|
||||||
flinkKafkaConsumer.setStartFromEarliest();
|
|
||||||
|
|
||||||
flinkKafkaConsumer
|
|
||||||
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
|
|
||||||
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
|
|
||||||
createBackupProducer(outputTopic, kafkaAddress);
|
|
||||||
|
|
||||||
DataStream<InputMessage> inputMessagesStream =
|
|
||||||
environment.addSource(flinkKafkaConsumer);
|
|
||||||
|
|
||||||
inputMessagesStream
|
|
||||||
.timeWindowAll(Time.hours(24))
|
|
||||||
.aggregate(new BackupAggregator())
|
|
||||||
.addSink(flinkKafkaProducer);
|
|
||||||
|
|
||||||
environment.execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
createBackup();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,34 +0,0 @@
|
|||||||
package com.baeldung.flink.operator;
|
|
||||||
|
|
||||||
import com.baeldung.flink.model.Backup;
|
|
||||||
import com.baeldung.flink.model.InputMessage;
|
|
||||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
|
|
||||||
@Override
|
|
||||||
public List<InputMessage> createAccumulator() {
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
|
|
||||||
inputMessages.add(inputMessage);
|
|
||||||
return inputMessages;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Backup getResult(List<InputMessage> inputMessages) {
|
|
||||||
Backup backup = new Backup(inputMessages, LocalDateTime.now());
|
|
||||||
return backup;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
|
|
||||||
inputMessages.addAll(acc1);
|
|
||||||
return inputMessages;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,11 +0,0 @@
|
|||||||
## Data Libraries
|
|
||||||
|
|
||||||
This module contains articles about libraries for data processing in Java.
|
|
||||||
|
|
||||||
### Relevant articles
|
|
||||||
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
|
|
||||||
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
|
|
||||||
- More articles: [[<-- prev]](/../libraries-data-2)
|
|
||||||
|
|
||||||
##### Building the project
|
|
||||||
You can build the project from the command line using: *mvn clean install*, or in an IDE. If you have issues with the derive4j imports in your IDE, you have to add the folder: *target/generated-sources/annotations* to the project build path in your IDE.
|
|
@ -1,64 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
<artifactId>libraries-data-3</artifactId>
|
|
||||||
<name>libraries-data-3</name>
|
|
||||||
|
|
||||||
<parent>
|
|
||||||
<groupId>com.baeldung</groupId>
|
|
||||||
<artifactId>parent-modules</artifactId>
|
|
||||||
<version>1.0.0-SNAPSHOT</version>
|
|
||||||
</parent>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-clients</artifactId>
|
|
||||||
<version>${kafka.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-streams</artifactId>
|
|
||||||
<version>${kafka.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-api</artifactId>
|
|
||||||
<version>${slf4j.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
|
||||||
<version>${slf4j.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.assertj</groupId>
|
|
||||||
<artifactId>assertj-core</artifactId>
|
|
||||||
<version>${assertj.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.testcontainers</groupId>
|
|
||||||
<artifactId>kafka</artifactId>
|
|
||||||
<version>${testcontainers-kafka.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.testcontainers</groupId>
|
|
||||||
<artifactId>junit-jupiter</artifactId>
|
|
||||||
<version>${testcontainers-jupiter.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
<properties>
|
|
||||||
<assertj.version>3.6.2</assertj.version>
|
|
||||||
<slf4j.version>1.7.25</slf4j.version>
|
|
||||||
<kafka.version>2.8.0</kafka.version>
|
|
||||||
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
|
|
||||||
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
|
|
||||||
</properties>
|
|
||||||
|
|
||||||
</project>
|
|
@ -3,14 +3,10 @@
|
|||||||
This module contains articles about libraries for data processing in Java.
|
This module contains articles about libraries for data processing in Java.
|
||||||
|
|
||||||
### Relevant articles
|
### Relevant articles
|
||||||
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
|
|
||||||
- [Introduction to JCache](https://www.baeldung.com/jcache)
|
- [Introduction to JCache](https://www.baeldung.com/jcache)
|
||||||
- [A Guide to Apache Ignite](https://www.baeldung.com/apache-ignite)
|
- [A Guide to Apache Ignite](https://www.baeldung.com/apache-ignite)
|
||||||
- [Apache Ignite with Spring Data](https://www.baeldung.com/apache-ignite-spring-data)
|
- [Apache Ignite with Spring Data](https://www.baeldung.com/apache-ignite-spring-data)
|
||||||
- [A Guide to Apache Crunch](https://www.baeldung.com/apache-crunch)
|
- [A Guide to Apache Crunch](https://www.baeldung.com/apache-crunch)
|
||||||
- [Intro to Apache Storm](https://www.baeldung.com/apache-storm)
|
- [Intro to Apache Storm](https://www.baeldung.com/apache-storm)
|
||||||
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
|
|
||||||
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
|
|
||||||
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
|
|
||||||
- [Guide to JMapper](https://www.baeldung.com/jmapper)
|
- [Guide to JMapper](https://www.baeldung.com/jmapper)
|
||||||
More articles: [[next -->]](/../libraries-data-2)
|
More articles: [[next -->]](/../libraries-data-2)
|
2
pom.xml
2
pom.xml
@ -345,6 +345,7 @@
|
|||||||
<module>antlr</module>
|
<module>antlr</module>
|
||||||
|
|
||||||
<module>apache-cxf</module>
|
<module>apache-cxf</module>
|
||||||
|
<module>apache-kafka</module>
|
||||||
<module>apache-libraries</module>
|
<module>apache-libraries</module>
|
||||||
<module>apache-olingo/olingo2</module>
|
<module>apache-olingo/olingo2</module>
|
||||||
<module>apache-poi</module>
|
<module>apache-poi</module>
|
||||||
@ -815,6 +816,7 @@
|
|||||||
<module>antlr</module>
|
<module>antlr</module>
|
||||||
|
|
||||||
<module>apache-cxf</module>
|
<module>apache-cxf</module>
|
||||||
|
<module>apache-kafka</module>
|
||||||
<module>apache-libraries</module>
|
<module>apache-libraries</module>
|
||||||
<module>apache-olingo/olingo2</module>
|
<module>apache-olingo/olingo2</module>
|
||||||
<module>apache-poi</module>
|
<module>apache-poi</module>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user