From e828a0de9a14836fe19addd486253f842159e58a Mon Sep 17 00:00:00 2001 From: DomWos Date: Mon, 13 Aug 2018 00:08:55 +0200 Subject: [PATCH 1/4] BAEL-1994: Flink and Kafka Data Pipeline --- apache-flink/pom.xml | 37 +++++++++ .../java/com/baeldung/FlinkDataPipeline.java | 81 +++++++++++++++++++ .../com/baeldung/flink/BackupAggregator.java | 34 ++++++++ .../java/com/baeldung/flink/Consumers.java | 32 ++++++++ .../flink/InputMessageTimestampAssigner.java | 23 ++++++ .../java/com/baeldung/flink/Producers.java | 17 ++++ .../com/baeldung/flink/WordsCapitalizer.java | 11 +++ .../main/java/com/baeldung/model/Backup.java | 23 ++++++ .../java/com/baeldung/model/InputMessage.java | 44 ++++++++++ .../schema/BackupSerializationSchema.java | 29 +++++++ .../InputMessageDeserializationSchema.java | 33 ++++++++ 11 files changed, 364 insertions(+) create mode 100644 apache-flink/pom.xml create mode 100644 apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/Consumers.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/Producers.java create mode 100644 apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java create mode 100644 apache-flink/src/main/java/com/baeldung/model/Backup.java create mode 100644 apache-flink/src/main/java/com/baeldung/model/InputMessage.java create mode 100644 apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java create mode 100644 apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java diff --git a/apache-flink/pom.xml b/apache-flink/pom.xml new file mode 100644 index 0000000000..c93ea3ef77 --- /dev/null +++ b/apache-flink/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + groupId + apache-flink + 1.0-SNAPSHOT + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + org.apache.flink + flink-core + 1.5.0 + + + org.apache.flink + flink-connector-kafka-0.11_2.11 + 1.5.0 + + + org.apache.flink + flink-streaming-java_2.11 + 1.5.0 + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.9.6 + + + \ No newline at end of file diff --git a/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java b/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java new file mode 100644 index 0000000000..d4e4cec60b --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java @@ -0,0 +1,81 @@ +package com.baeldung; + +import com.baeldung.flink.BackupAggregator; +import com.baeldung.flink.InputMessageTimestampAssigner; +import com.baeldung.flink.WordsCapitalizer; +import com.baeldung.model.Backup; +import com.baeldung.model.InputMessage; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +import static com.baeldung.flink.Consumers.*; +import static com.baeldung.flink.Producers.*; + +public class FlinkDataPipeline { + + public static void capitalize() throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String address = "localhost:9092"; + + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + + FlinkKafkaConsumer011 flinkKafkaConsumer = + createStringConsumerForTopic(inputTopic, address, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + DataStream stringInputStream = + environment.addSource(flinkKafkaConsumer); + + FlinkKafkaProducer011 flinkKafkaProducer = + createStringProducer(outputTopic, address); + + stringInputStream + .map(new WordsCapitalizer()) + .addSink(flinkKafkaProducer); + + environment.execute(); + } + +public static void createBackup () throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String kafkaAddress = "192.168.99.100:9092"; + + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer011 flinkKafkaConsumer = + createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + flinkKafkaConsumer + .assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()); + FlinkKafkaProducer011 flinkKafkaProducer = + createBackupProducer(outputTopic, kafkaAddress); + + DataStream inputMessagesStream = + environment.addSource(flinkKafkaConsumer); + + inputMessagesStream + .timeWindowAll(Time.hours(24)) + .aggregate(new BackupAggregator()) + .addSink(flinkKafkaProducer); + + environment.execute(); +} + + public static void main(String[] args) throws Exception { + createBackup(); + } + +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java b/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java new file mode 100644 index 0000000000..2bfbb1e270 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java @@ -0,0 +1,34 @@ +package com.baeldung.flink; + +import com.baeldung.model.Backup; +import com.baeldung.model.InputMessage; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + + public class BackupAggregator implements AggregateFunction, Backup> { + @Override + public List createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List add(InputMessage inputMessage, List inputMessages) { + inputMessages.add(inputMessage); + return inputMessages; + } + + @Override + public Backup getResult(List inputMessages) { + Backup backup = new Backup(inputMessages, LocalDateTime.now()); + return backup; + } + + @Override + public List merge(List inputMessages, List acc1) { + inputMessages.addAll(acc1); + return inputMessages; + } + } diff --git a/apache-flink/src/main/java/com/baeldung/flink/Consumers.java b/apache-flink/src/main/java/com/baeldung/flink/Consumers.java new file mode 100644 index 0000000000..e9aa78b677 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/Consumers.java @@ -0,0 +1,32 @@ +package com.baeldung.flink; + +import com.baeldung.schema.InputMessageDeserializationSchema; +import com.baeldung.model.InputMessage; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; + +import java.util.Properties; + +public class Consumers { + +public static FlinkKafkaConsumer011 createStringConsumerForTopic( + String topic, String kafkaAddress, String kafkaGroup ) { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", kafkaAddress); + props.setProperty("group.id",kafkaGroup); + FlinkKafkaConsumer011 consumer = + new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props); + + return consumer; +} + + public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", kafkaAddress); + properties.setProperty("group.id",kafkaGroup); + FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( + topic, new InputMessageDeserializationSchema(),properties); + + return consumer; + } +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java b/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java new file mode 100644 index 0000000000..5d58cb36cc --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java @@ -0,0 +1,23 @@ +package com.baeldung.flink; + +import com.baeldung.model.InputMessage; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; +import java.time.ZoneId; + +public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { + + @Override + public long extractTimestamp(InputMessage element, long previousElementTimestamp) { + ZoneId zoneId = ZoneId.systemDefault(); + return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp - 15); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/Producers.java b/apache-flink/src/main/java/com/baeldung/flink/Producers.java new file mode 100644 index 0000000000..094b6ff211 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/Producers.java @@ -0,0 +1,17 @@ +package com.baeldung.flink; + +import com.baeldung.schema.BackupSerializationSchema; +import com.baeldung.model.Backup; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +public class Producers { + + public static FlinkKafkaProducer011 createStringProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema()); + } + + public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema()); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java b/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java new file mode 100644 index 0000000000..49fffc292e --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java @@ -0,0 +1,11 @@ +package com.baeldung.flink; + +import org.apache.flink.api.common.functions.MapFunction; + +public class WordsCapitalizer implements MapFunction { + + @Override + public String map(String s) { + return s.toUpperCase(); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/model/Backup.java b/apache-flink/src/main/java/com/baeldung/model/Backup.java new file mode 100644 index 0000000000..3a57d65d78 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/model/Backup.java @@ -0,0 +1,23 @@ +package com.baeldung.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +public class Backup { + + @JsonProperty("inputMessages") + List inputMessages; + @JsonProperty("backupTimestamp") + LocalDateTime backupTimestamp; + @JsonProperty("uuid") + UUID uuid; + + public Backup(List inputMessages, LocalDateTime backupTimestamp) { + this.inputMessages = inputMessages; + this.backupTimestamp = backupTimestamp; + this.uuid = UUID.randomUUID(); + } +} diff --git a/apache-flink/src/main/java/com/baeldung/model/InputMessage.java b/apache-flink/src/main/java/com/baeldung/model/InputMessage.java new file mode 100644 index 0000000000..57a85de81a --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/model/InputMessage.java @@ -0,0 +1,44 @@ +package com.baeldung.model; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.time.LocalDateTime; + +@JsonSerialize +public class InputMessage { + String sender; + String recipient; + LocalDateTime sentAt; + String message; + + public String getSender() { + return sender; + } + + public void setSender(String sender) { + this.sender = sender; + } + + public String getRecipient() { + return recipient; + } + + public void setRecipient(String recipient) { + this.recipient = recipient; + } + + public LocalDateTime getSentAt() { + return sentAt; + } + + public void setSentAt(LocalDateTime sentAt) { + this.sentAt = sentAt; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java b/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java new file mode 100644 index 0000000000..fa525a5ee8 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java @@ -0,0 +1,29 @@ +package com.baeldung.schema; + +import com.baeldung.model.Backup; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BackupSerializationSchema + implements SerializationSchema { + + ObjectMapper objectMapper; + Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); + + @Override + public byte[] serialize(Backup backupMessage) { + if(objectMapper == null) { + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + try { + String json = objectMapper.writeValueAsString(backupMessage); + return json.getBytes(); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + logger.error("Failed to parse JSON", e); + } + return new byte[0]; + } +} diff --git a/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java b/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java new file mode 100644 index 0000000000..99b6baa6f4 --- /dev/null +++ b/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.baeldung.schema; + +import com.baeldung.model.InputMessage; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +public class InputMessageDeserializationSchema implements + DeserializationSchema { + + ObjectMapper objectMapper; + + @Override + public InputMessage deserialize(byte[] bytes) throws IOException { + if(objectMapper == null) { + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + return objectMapper.readValue(bytes, InputMessage.class); + } + + @Override + public boolean isEndOfStream(InputMessage inputMessage) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(InputMessage.class); + } +} From 610d0ce869f81391aa3032c66e49203fe8abb1a0 Mon Sep 17 00:00:00 2001 From: DomWos Date: Sat, 18 Aug 2018 20:20:29 +0200 Subject: [PATCH 2/4] BAEL-1994: Moved the code to libraries module --- apache-flink/pom.xml | 37 ------------------- libraries/pom.xml | 18 ++++++++- .../baeldung/flink}/FlinkDataPipeline.java | 0 .../baeldung/flink/connector}/Consumers.java | 0 .../baeldung/flink/connector}/Producers.java | 0 .../com/baeldung/flink}/model/Backup.java | 0 .../baeldung/flink}/model/InputMessage.java | 0 .../flink/operator}/BackupAggregator.java | 0 .../InputMessageTimestampAssigner.java | 0 .../flink/operator}/WordsCapitalizer.java | 0 .../schema/BackupSerializationSchema.java | 0 .../InputMessageDeserializationSchema.java | 0 12 files changed, 17 insertions(+), 38 deletions(-) delete mode 100644 apache-flink/pom.xml rename {apache-flink/src/main/java/com/baeldung => libraries/src/main/java/com/baeldung/flink}/FlinkDataPipeline.java (100%) rename {apache-flink/src/main/java/com/baeldung/flink => libraries/src/main/java/com/baeldung/flink/connector}/Consumers.java (100%) rename {apache-flink/src/main/java/com/baeldung/flink => libraries/src/main/java/com/baeldung/flink/connector}/Producers.java (100%) rename {apache-flink/src/main/java/com/baeldung => libraries/src/main/java/com/baeldung/flink}/model/Backup.java (100%) rename {apache-flink/src/main/java/com/baeldung => libraries/src/main/java/com/baeldung/flink}/model/InputMessage.java (100%) rename {apache-flink/src/main/java/com/baeldung/flink => libraries/src/main/java/com/baeldung/flink/operator}/BackupAggregator.java (100%) rename {apache-flink/src/main/java/com/baeldung/flink => libraries/src/main/java/com/baeldung/flink/operator}/InputMessageTimestampAssigner.java (100%) rename {apache-flink/src/main/java/com/baeldung/flink => libraries/src/main/java/com/baeldung/flink/operator}/WordsCapitalizer.java (100%) rename {apache-flink/src/main/java/com/baeldung => libraries/src/main/java/com/baeldung/flink}/schema/BackupSerializationSchema.java (100%) rename {apache-flink/src/main/java/com/baeldung => libraries/src/main/java/com/baeldung/flink}/schema/InputMessageDeserializationSchema.java (100%) diff --git a/apache-flink/pom.xml b/apache-flink/pom.xml deleted file mode 100644 index c93ea3ef77..0000000000 --- a/apache-flink/pom.xml +++ /dev/null @@ -1,37 +0,0 @@ - - - 4.0.0 - - groupId - apache-flink - 1.0-SNAPSHOT - - com.baeldung - parent-modules - 1.0.0-SNAPSHOT - - - - org.apache.flink - flink-core - 1.5.0 - - - org.apache.flink - flink-connector-kafka-0.11_2.11 - 1.5.0 - - - org.apache.flink - flink-streaming-java_2.11 - 1.5.0 - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - 2.9.6 - - - \ No newline at end of file diff --git a/libraries/pom.xml b/libraries/pom.xml index b19a005d94..047b56dd01 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -169,6 +169,17 @@ commons-dbutils ${commons.dbutils.version} + + org.apache.flink + flink-connector-kafka-0.11_2.11 + ${flink.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + + org.apache.flink flink-core @@ -243,6 +254,11 @@ jackson-databind ${jackson.version} + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + org.datanucleus @@ -947,7 +963,7 @@ 4.5.3 2.5 - 1.2.0 + 1.5.0 2.8.5 2.92 1.9.26 diff --git a/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java rename to libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java diff --git a/apache-flink/src/main/java/com/baeldung/flink/Consumers.java b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/flink/Consumers.java rename to libraries/src/main/java/com/baeldung/flink/connector/Consumers.java diff --git a/apache-flink/src/main/java/com/baeldung/flink/Producers.java b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/flink/Producers.java rename to libraries/src/main/java/com/baeldung/flink/connector/Producers.java diff --git a/apache-flink/src/main/java/com/baeldung/model/Backup.java b/libraries/src/main/java/com/baeldung/flink/model/Backup.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/model/Backup.java rename to libraries/src/main/java/com/baeldung/flink/model/Backup.java diff --git a/apache-flink/src/main/java/com/baeldung/model/InputMessage.java b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/model/InputMessage.java rename to libraries/src/main/java/com/baeldung/flink/model/InputMessage.java diff --git a/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java rename to libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java diff --git a/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java rename to libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java diff --git a/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java rename to libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java diff --git a/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java rename to libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java diff --git a/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java similarity index 100% rename from apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java rename to libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java From 1921b585a62ad3fcfe989610c64f091161cd4ef3 Mon Sep 17 00:00:00 2001 From: DomWos Date: Fri, 24 Aug 2018 11:14:40 +0200 Subject: [PATCH 3/4] BAEL-1994: Updates and small changes --- libraries/pom.xml | 2 +- .../com/baeldung/flink/FlinkDataPipeline.java | 17 +-- .../baeldung/flink/connector/Consumers.java | 6 +- .../baeldung/flink/connector/Producers.java | 8 +- .../java/com/baeldung/flink/model/Backup.java | 6 +- .../baeldung/flink/model/InputMessage.java | 30 ++++- .../flink/operator/BackupAggregator.java | 6 +- .../InputMessageTimestampAssigner.java | 4 +- .../flink/operator/WordsCapitalizer.java | 2 +- .../schema/BackupSerializationSchema.java | 4 +- .../InputMessageDeserializationSchema.java | 4 +- .../flink/BackupCreatorIntegrationTest.java | 103 ++++++++++++++++++ .../flink/WordCapitalizerIntegrationTest.java | 34 ++++++ 13 files changed, 198 insertions(+), 28 deletions(-) create mode 100644 libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java create mode 100644 libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java diff --git a/libraries/pom.xml b/libraries/pom.xml index 047b56dd01..d129844543 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -204,7 +204,7 @@ org.apache.flink - flink-test-utils_2.10 + flink-test-utils_2.11 ${flink.version} test diff --git a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java index d4e4cec60b..423637bf53 100644 --- a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java +++ b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java @@ -1,10 +1,11 @@ -package com.baeldung; +package com.baeldung.flink; -import com.baeldung.flink.BackupAggregator; -import com.baeldung.flink.InputMessageTimestampAssigner; -import com.baeldung.flink.WordsCapitalizer; -import com.baeldung.model.Backup; -import com.baeldung.model.InputMessage; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.operator.BackupAggregator; +import com.baeldung.flink.operator.InputMessageTimestampAssigner; +import com.baeldung.flink.operator.WordsCapitalizer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -12,8 +13,8 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; -import static com.baeldung.flink.Consumers.*; -import static com.baeldung.flink.Producers.*; +import static com.baeldung.flink.connector.Consumers.*; +import static com.baeldung.flink.connector.Producers.*; public class FlinkDataPipeline { diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java index e9aa78b677..514085f9c4 100644 --- a/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java +++ b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java @@ -1,7 +1,7 @@ -package com.baeldung.flink; +package com.baeldung.flink.connector; -import com.baeldung.schema.InputMessageDeserializationSchema; -import com.baeldung.model.InputMessage; +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.schema.InputMessageDeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Producers.java b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java index 094b6ff211..8e6f3f8f37 100644 --- a/libraries/src/main/java/com/baeldung/flink/connector/Producers.java +++ b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java @@ -1,7 +1,7 @@ -package com.baeldung.flink; +package com.baeldung.flink.connector; -import com.baeldung.schema.BackupSerializationSchema; -import com.baeldung.model.Backup; +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.schema.BackupSerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; @@ -12,6 +12,6 @@ public class Producers { } public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) { - return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema()); + return new FlinkKafkaProducer011(kafkaAddress, topic, new BackupSerializationSchema()); } } diff --git a/libraries/src/main/java/com/baeldung/flink/model/Backup.java b/libraries/src/main/java/com/baeldung/flink/model/Backup.java index 3a57d65d78..268ceec7f3 100644 --- a/libraries/src/main/java/com/baeldung/flink/model/Backup.java +++ b/libraries/src/main/java/com/baeldung/flink/model/Backup.java @@ -1,4 +1,4 @@ -package com.baeldung.model; +package com.baeldung.flink.model; import com.fasterxml.jackson.annotation.JsonProperty; @@ -20,4 +20,8 @@ public class Backup { this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID(); } + + public List getInputMessages() { + return inputMessages; + } } diff --git a/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java index 57a85de81a..183fa69c11 100644 --- a/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java +++ b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java @@ -1,6 +1,8 @@ -package com.baeldung.model; +package com.baeldung.flink.model; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.base.Objects; + import java.time.LocalDateTime; @JsonSerialize @@ -10,6 +12,9 @@ public class InputMessage { LocalDateTime sentAt; String message; + public InputMessage() { + } + public String getSender() { return sender; } @@ -41,4 +46,27 @@ public class InputMessage { public void setMessage(String message) { this.message = message; } + + public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) { + this.sender = sender; + this.recipient = recipient; + this.sentAt = sentAt; + this.message = message; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + InputMessage message1 = (InputMessage) o; + return Objects.equal(sender, message1.sender) && + Objects.equal(recipient, message1.recipient) && + Objects.equal(sentAt, message1.sentAt) && + Objects.equal(message, message1.message); + } + + @Override + public int hashCode() { + return Objects.hashCode(sender, recipient, sentAt, message); + } } diff --git a/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java index 2bfbb1e270..c39b8413d1 100644 --- a/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java +++ b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java @@ -1,7 +1,7 @@ -package com.baeldung.flink; +package com.baeldung.flink.operator; -import com.baeldung.model.Backup; -import com.baeldung.model.InputMessage; +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; import org.apache.flink.api.common.functions.AggregateFunction; import java.time.LocalDateTime; diff --git a/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java index 5d58cb36cc..05828d9588 100644 --- a/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java +++ b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java @@ -1,6 +1,6 @@ -package com.baeldung.flink; +package com.baeldung.flink.operator; -import com.baeldung.model.InputMessage; +import com.baeldung.flink.model.InputMessage; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; diff --git a/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java index 49fffc292e..f9103d225c 100644 --- a/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java +++ b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java @@ -1,4 +1,4 @@ -package com.baeldung.flink; +package com.baeldung.flink.operator; import org.apache.flink.api.common.functions.MapFunction; diff --git a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java index fa525a5ee8..4db9556d8d 100644 --- a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java +++ b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java @@ -1,6 +1,6 @@ -package com.baeldung.schema; +package com.baeldung.flink.schema; -import com.baeldung.model.Backup; +import com.baeldung.flink.model.Backup; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.flink.api.common.serialization.SerializationSchema; diff --git a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java index 99b6baa6f4..3c81b67ec1 100644 --- a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java +++ b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java @@ -1,6 +1,6 @@ -package com.baeldung.schema; +package com.baeldung.flink.schema; -import com.baeldung.model.InputMessage; +import com.baeldung.flink.model.InputMessage; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java new file mode 100644 index 0000000000..ab7d119c16 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java @@ -0,0 +1,103 @@ +package com.baeldung.flink; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.operator.BackupAggregator; +import com.baeldung.flink.operator.InputMessageTimestampAssigner; +import com.baeldung.flink.schema.BackupSerializationSchema; +import com.baeldung.flink.schema.InputMessageDeserializationSchema; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class BackupCreatorIntegrationTest { + public static ObjectMapper mapper; + + @Before + public void setup() { + mapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + + @Test + public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException { + InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message"); + byte[] messageSerialized = mapper.writeValueAsBytes(message); + DeserializationSchema deserializationSchema = new InputMessageDeserializationSchema(); + InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized); + + assertEquals(message, messageDeserialized); + } + + @Test + public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception { + LocalDateTime currentTime = LocalDateTime.now(); + InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage"); + InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage"); + InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage"); + InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage"); + InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage"); + InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage"); + + List firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage); + List secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage); + List inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + DataStreamSource testDataSet = env.fromCollection(inputMessages); + CollectingSink sink = new CollectingSink(); + testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()) + .timeWindowAll(Time.hours(24)) + .aggregate(new BackupAggregator()) + .addSink(sink); + + env.execute(); + + Awaitility.await().until(() -> sink.backups.size() == 2); + assertEquals(2, sink.backups.size()); + assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages()); + assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages()); + + } + + @Test + public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException { + InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message"); + List messages = Arrays.asList(message); + Backup backup = new Backup(messages, LocalDateTime.now()); + byte[] backupSerialized = mapper.writeValueAsBytes(backup); + SerializationSchema serializationSchema = new BackupSerializationSchema(); + byte[] backupProcessed = serializationSchema.serialize(backup); + + assertEquals(backupSerialized, backupProcessed); + } + + private static class CollectingSink implements SinkFunction { + + public static List backups = new ArrayList<>(); + + @Override + public synchronized void invoke(Backup value, Context context) throws Exception { + backups.add(value); + } + } +} diff --git a/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java new file mode 100644 index 0000000000..8a98dae4b5 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java @@ -0,0 +1,34 @@ +package com.baeldung.flink; + +import com.baeldung.flink.operator.WordsCapitalizer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class WordCapitalizerIntegrationTest { + + @Test + public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + List data = Arrays.asList("dog", "cat", "wolf", "pig"); + + DataSet testDataSet = env.fromCollection(data); + + + List dataProcessed = testDataSet + .map(new WordsCapitalizer()) + .collect(); + + List testDataCapitalized = data.stream() + .map(String::toUpperCase) + .collect(Collectors.toList()); + + Assert.assertEquals(testDataCapitalized, dataProcessed); + } + +} From be81f87c3cb93108d24e69ac2f76d2660ad637af Mon Sep 17 00:00:00 2001 From: DomWos Date: Mon, 3 Sep 2018 12:08:45 +0200 Subject: [PATCH 4/4] Switched fields to be static to remove initialization in method. --- .../main/java/com/baeldung/flink/FlinkDataPipeline.java | 2 +- .../baeldung/flink/schema/BackupSerializationSchema.java | 6 +++++- .../flink/schema/InputMessageDeserializationSchema.java | 9 +++++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java index 423637bf53..d02b1bcb83 100644 --- a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java +++ b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java @@ -48,7 +48,7 @@ public static void createBackup () throws Exception { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; - String kafkaAddress = "192.168.99.100:9092"; + String kafkaAddress = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java index 4db9556d8d..967b266bb6 100644 --- a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java +++ b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java @@ -1,6 +1,8 @@ package com.baeldung.flink.schema; import com.baeldung.flink.model.Backup; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -10,12 +12,14 @@ import org.slf4j.LoggerFactory; public class BackupSerializationSchema implements SerializationSchema { - ObjectMapper objectMapper; + static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); @Override public byte[] serialize(Backup backupMessage) { if(objectMapper == null) { + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); } try { diff --git a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java index 3c81b67ec1..1df456bbe5 100644 --- a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java +++ b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java @@ -1,6 +1,8 @@ package com.baeldung.flink.schema; import com.baeldung.flink.model.InputMessage; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -11,13 +13,12 @@ import java.io.IOException; public class InputMessageDeserializationSchema implements DeserializationSchema { - ObjectMapper objectMapper; + static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + @Override public InputMessage deserialize(byte[] bytes) throws IOException { - if(objectMapper == null) { - objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); - } + return objectMapper.readValue(bytes, InputMessage.class); }