From e828a0de9a14836fe19addd486253f842159e58a Mon Sep 17 00:00:00 2001 From: DomWos Date: Mon, 13 Aug 2018 00:08:55 +0200 Subject: [PATCH] BAEL-1994: Flink and Kafka Data Pipeline --- apache-flink/pom.xml | 37 +++++++++ .../java/com/baeldung/FlinkDataPipeline.java | 81 +++++++++++++++++++ .../com/baeldung/flink/BackupAggregator.java | 34 ++++++++ .../java/com/baeldung/flink/Consumers.java | 32 ++++++++ .../flink/InputMessageTimestampAssigner.java | 23 ++++++ .../java/com/baeldung/flink/Producers.java | 17 ++++ .../com/baeldung/flink/WordsCapitalizer.java | 11 +++ .../main/java/com/baeldung/model/Backup.java | 23 ++++++ .../java/com/baeldung/model/InputMessage.java | 44 ++++++++++ .../schema/BackupSerializationSchema.java | 29 +++++++ .../InputMessageDeserializationSchema.java | 33 ++++++++ 11 files changed, 364 insertions(+) create mode 100644 apache-flink/pom.xml create mode 100644 apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/Consumers.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/Producers.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java create mode 100644 apache-flink/src/main/java/com/baeldung/model/Backup.java create mode 100644 apache-flink/src/main/java/com/baeldung/model/InputMessage.java create mode 100644 apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java create mode 100644 apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java diff --git a/apache-flink/pom.xml b/apache-flink/pom.xml new file mode 100644 index 0000000000..c93ea3ef77 --- /dev/null +++ b/apache-flink/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + groupId + apache-flink + 1.0-SNAPSHOT + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + org.apache.flink + flink-core + 1.5.0 + + + org.apache.flink + flink-connector-kafka-0.11_2.11 + 1.5.0 + + + org.apache.flink + flink-streaming-java_2.11 + 1.5.0 + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.9.6 + + + \ No newline at end of file diff --git a/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java b/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java new file mode 100644 index 0000000000..d4e4cec60b --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java @@ -0,0 +1,81 @@ +package com.baeldung; + +import com.baeldung.flink.BackupAggregator; +import com.baeldung.flink.InputMessageTimestampAssigner; +import com.baeldung.flink.WordsCapitalizer; +import com.baeldung.model.Backup; +import com.baeldung.model.InputMessage; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +import static com.baeldung.flink.Consumers.*; +import static com.baeldung.flink.Producers.*; + +public class FlinkDataPipeline { + + public static void capitalize() throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String address = "localhost:9092"; + + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + + FlinkKafkaConsumer011 flinkKafkaConsumer = + createStringConsumerForTopic(inputTopic, address, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + DataStream stringInputStream = + environment.addSource(flinkKafkaConsumer); + + FlinkKafkaProducer011 flinkKafkaProducer = + createStringProducer(outputTopic, address); + + stringInputStream + .map(new WordsCapitalizer()) + .addSink(flinkKafkaProducer); + + environment.execute(); + } + +public static void createBackup () throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String kafkaAddress = "192.168.99.100:9092"; + + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer011 flinkKafkaConsumer = + createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + flinkKafkaConsumer + .assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()); + FlinkKafkaProducer011 flinkKafkaProducer = + createBackupProducer(outputTopic, kafkaAddress); + + DataStream inputMessagesStream = + environment.addSource(flinkKafkaConsumer); + + inputMessagesStream + .timeWindowAll(Time.hours(24)) + .aggregate(new BackupAggregator()) + .addSink(flinkKafkaProducer); + + environment.execute(); +} + + public static void main(String[] args) throws Exception { + createBackup(); + } + +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java b/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java new file mode 100644 index 0000000000..2bfbb1e270 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java @@ -0,0 +1,34 @@ +package com.baeldung.flink; + +import com.baeldung.model.Backup; +import com.baeldung.model.InputMessage; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + + public class BackupAggregator implements AggregateFunction, Backup> { + @Override + public List createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List add(InputMessage inputMessage, List inputMessages) { + inputMessages.add(inputMessage); + return inputMessages; + } + + @Override + public Backup getResult(List inputMessages) { + Backup backup = new Backup(inputMessages, LocalDateTime.now()); + return backup; + } + + @Override + public List merge(List inputMessages, List acc1) { + inputMessages.addAll(acc1); + return inputMessages; + } + } diff --git a/apache-flink/src/main/java/com/baeldung/flink/Consumers.java b/apache-flink/src/main/java/com/baeldung/flink/Consumers.java new file mode 100644 index 0000000000..e9aa78b677 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/Consumers.java @@ -0,0 +1,32 @@ +package com.baeldung.flink; + +import com.baeldung.schema.InputMessageDeserializationSchema; +import com.baeldung.model.InputMessage; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; + +import java.util.Properties; + +public class Consumers { + +public static FlinkKafkaConsumer011 createStringConsumerForTopic( + String topic, String kafkaAddress, String kafkaGroup ) { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", kafkaAddress); + props.setProperty("group.id",kafkaGroup); + FlinkKafkaConsumer011 consumer = + new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props); + + return consumer; +} + + public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", kafkaAddress); + properties.setProperty("group.id",kafkaGroup); + FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( + topic, new InputMessageDeserializationSchema(),properties); + + return consumer; + } +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java b/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java new file mode 100644 index 0000000000..5d58cb36cc --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java @@ -0,0 +1,23 @@ +package com.baeldung.flink; + +import com.baeldung.model.InputMessage; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; +import java.time.ZoneId; + +public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { + + @Override + public long extractTimestamp(InputMessage element, long previousElementTimestamp) { + ZoneId zoneId = ZoneId.systemDefault(); + return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp - 15); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/Producers.java b/apache-flink/src/main/java/com/baeldung/flink/Producers.java new file mode 100644 index 0000000000..094b6ff211 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/Producers.java @@ -0,0 +1,17 @@ +package com.baeldung.flink; + +import com.baeldung.schema.BackupSerializationSchema; +import com.baeldung.model.Backup; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +public class Producers { + + public static FlinkKafkaProducer011 createStringProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema()); + } + + public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema()); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java b/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java new file mode 100644 index 0000000000..49fffc292e --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java @@ -0,0 +1,11 @@ +package com.baeldung.flink; + +import org.apache.flink.api.common.functions.MapFunction; + +public class WordsCapitalizer implements MapFunction { + + @Override + public String map(String s) { + return s.toUpperCase(); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/model/Backup.java b/apache-flink/src/main/java/com/baeldung/model/Backup.java new file mode 100644 index 0000000000..3a57d65d78 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/model/Backup.java @@ -0,0 +1,23 @@ +package com.baeldung.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +public class Backup { + + @JsonProperty("inputMessages") + List inputMessages; + @JsonProperty("backupTimestamp") + LocalDateTime backupTimestamp; + @JsonProperty("uuid") + UUID uuid; + + public Backup(List inputMessages, LocalDateTime backupTimestamp) { + this.inputMessages = inputMessages; + this.backupTimestamp = backupTimestamp; + this.uuid = UUID.randomUUID(); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/model/InputMessage.java b/apache-flink/src/main/java/com/baeldung/model/InputMessage.java new file mode 100644 index 0000000000..57a85de81a --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/model/InputMessage.java @@ -0,0 +1,44 @@ +package com.baeldung.model; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.time.LocalDateTime; + +@JsonSerialize +public class InputMessage { + String sender; + String recipient; + LocalDateTime sentAt; + String message; + + public String getSender() { + return sender; + } + + public void setSender(String sender) { + this.sender = sender; + } + + public String getRecipient() { + return recipient; + } + + public void setRecipient(String recipient) { + this.recipient = recipient; + } + + public LocalDateTime getSentAt() { + return sentAt; + } + + public void setSentAt(LocalDateTime sentAt) { + this.sentAt = sentAt; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java b/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java new file mode 100644 index 0000000000..fa525a5ee8 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java @@ -0,0 +1,29 @@ +package com.baeldung.schema; + +import com.baeldung.model.Backup; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BackupSerializationSchema + implements SerializationSchema { + + ObjectMapper objectMapper; + Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); + + @Override + public byte[] serialize(Backup backupMessage) { + if(objectMapper == null) { + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + try { + String json = objectMapper.writeValueAsString(backupMessage); + return json.getBytes(); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + logger.error("Failed to parse JSON", e); + } + return new byte[0]; + } +} diff --git a/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java b/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java new file mode 100644 index 0000000000..99b6baa6f4 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.baeldung.schema; + +import com.baeldung.model.InputMessage; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +public class InputMessageDeserializationSchema implements + DeserializationSchema { + + ObjectMapper objectMapper; + + @Override + public InputMessage deserialize(byte[] bytes) throws IOException { + if(objectMapper == null) { + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + return objectMapper.readValue(bytes, InputMessage.class); + } + + @Override + public boolean isEndOfStream(InputMessage inputMessage) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(InputMessage.class); + } +}