diff --git a/apache-flink/pom.xml b/apache-flink/pom.xml
new file mode 100644
index 0000000000..c93ea3ef77
--- /dev/null
+++ b/apache-flink/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+ groupId
+ apache-flink
+ 1.0-SNAPSHOT
+
+ com.baeldung
+ parent-modules
+ 1.0.0-SNAPSHOT
+
+
+
+ org.apache.flink
+ flink-core
+ 1.5.0
+
+
+ org.apache.flink
+ flink-connector-kafka-0.11_2.11
+ 1.5.0
+
+
+ org.apache.flink
+ flink-streaming-java_2.11
+ 1.5.0
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ 2.9.6
+
+
+
\ No newline at end of file
diff --git a/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java b/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java
new file mode 100644
index 0000000000..d4e4cec60b
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/FlinkDataPipeline.java
@@ -0,0 +1,81 @@
+package com.baeldung;
+
+import com.baeldung.flink.BackupAggregator;
+import com.baeldung.flink.InputMessageTimestampAssigner;
+import com.baeldung.flink.WordsCapitalizer;
+import com.baeldung.model.Backup;
+import com.baeldung.model.InputMessage;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
+
+import static com.baeldung.flink.Consumers.*;
+import static com.baeldung.flink.Producers.*;
+
+public class FlinkDataPipeline {
+
+ public static void capitalize() throws Exception {
+ String inputTopic = "flink_input";
+ String outputTopic = "flink_output";
+ String consumerGroup = "baeldung";
+ String address = "localhost:9092";
+
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ FlinkKafkaConsumer011 flinkKafkaConsumer =
+ createStringConsumerForTopic(inputTopic, address, consumerGroup);
+ flinkKafkaConsumer.setStartFromEarliest();
+
+ DataStream stringInputStream =
+ environment.addSource(flinkKafkaConsumer);
+
+ FlinkKafkaProducer011 flinkKafkaProducer =
+ createStringProducer(outputTopic, address);
+
+ stringInputStream
+ .map(new WordsCapitalizer())
+ .addSink(flinkKafkaProducer);
+
+ environment.execute();
+ }
+
+public static void createBackup () throws Exception {
+ String inputTopic = "flink_input";
+ String outputTopic = "flink_output";
+ String consumerGroup = "baeldung";
+ String kafkaAddress = "192.168.99.100:9092";
+
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ FlinkKafkaConsumer011 flinkKafkaConsumer =
+ createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
+ flinkKafkaConsumer.setStartFromEarliest();
+
+ flinkKafkaConsumer
+ .assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
+ FlinkKafkaProducer011 flinkKafkaProducer =
+ createBackupProducer(outputTopic, kafkaAddress);
+
+ DataStream inputMessagesStream =
+ environment.addSource(flinkKafkaConsumer);
+
+ inputMessagesStream
+ .timeWindowAll(Time.hours(24))
+ .aggregate(new BackupAggregator())
+ .addSink(flinkKafkaProducer);
+
+ environment.execute();
+}
+
+ public static void main(String[] args) throws Exception {
+ createBackup();
+ }
+
+}
diff --git a/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java b/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java
new file mode 100644
index 0000000000..2bfbb1e270
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/flink/BackupAggregator.java
@@ -0,0 +1,34 @@
+package com.baeldung.flink;
+
+import com.baeldung.model.Backup;
+import com.baeldung.model.InputMessage;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+ public class BackupAggregator implements AggregateFunction, Backup> {
+ @Override
+ public List createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List add(InputMessage inputMessage, List inputMessages) {
+ inputMessages.add(inputMessage);
+ return inputMessages;
+ }
+
+ @Override
+ public Backup getResult(List inputMessages) {
+ Backup backup = new Backup(inputMessages, LocalDateTime.now());
+ return backup;
+ }
+
+ @Override
+ public List merge(List inputMessages, List acc1) {
+ inputMessages.addAll(acc1);
+ return inputMessages;
+ }
+ }
diff --git a/apache-flink/src/main/java/com/baeldung/flink/Consumers.java b/apache-flink/src/main/java/com/baeldung/flink/Consumers.java
new file mode 100644
index 0000000000..e9aa78b677
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/flink/Consumers.java
@@ -0,0 +1,32 @@
+package com.baeldung.flink;
+
+import com.baeldung.schema.InputMessageDeserializationSchema;
+import com.baeldung.model.InputMessage;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+
+import java.util.Properties;
+
+public class Consumers {
+
+public static FlinkKafkaConsumer011 createStringConsumerForTopic(
+ String topic, String kafkaAddress, String kafkaGroup ) {
+ Properties props = new Properties();
+ props.setProperty("bootstrap.servers", kafkaAddress);
+ props.setProperty("group.id",kafkaGroup);
+ FlinkKafkaConsumer011 consumer =
+ new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
+
+ return consumer;
+}
+
+ public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", kafkaAddress);
+ properties.setProperty("group.id",kafkaGroup);
+ FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(
+ topic, new InputMessageDeserializationSchema(),properties);
+
+ return consumer;
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java b/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java
new file mode 100644
index 0000000000..5d58cb36cc
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/flink/InputMessageTimestampAssigner.java
@@ -0,0 +1,23 @@
+package com.baeldung.flink;
+
+import com.baeldung.model.InputMessage;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+import java.time.ZoneId;
+
+public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks {
+
+ @Override
+ public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
+ ZoneId zoneId = ZoneId.systemDefault();
+ return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
+ }
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) {
+ return new Watermark(extractedTimestamp - 15);
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/flink/Producers.java b/apache-flink/src/main/java/com/baeldung/flink/Producers.java
new file mode 100644
index 0000000000..094b6ff211
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/flink/Producers.java
@@ -0,0 +1,17 @@
+package com.baeldung.flink;
+
+import com.baeldung.schema.BackupSerializationSchema;
+import com.baeldung.model.Backup;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
+
+public class Producers {
+
+ public static FlinkKafkaProducer011 createStringProducer(String topic, String kafkaAddress) {
+ return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema());
+ }
+
+ public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) {
+ return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema());
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java b/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java
new file mode 100644
index 0000000000..49fffc292e
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/flink/WordsCapitalizer.java
@@ -0,0 +1,11 @@
+package com.baeldung.flink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+public class WordsCapitalizer implements MapFunction {
+
+ @Override
+ public String map(String s) {
+ return s.toUpperCase();
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/model/Backup.java b/apache-flink/src/main/java/com/baeldung/model/Backup.java
new file mode 100644
index 0000000000..3a57d65d78
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/model/Backup.java
@@ -0,0 +1,23 @@
+package com.baeldung.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.UUID;
+
+public class Backup {
+
+ @JsonProperty("inputMessages")
+ List inputMessages;
+ @JsonProperty("backupTimestamp")
+ LocalDateTime backupTimestamp;
+ @JsonProperty("uuid")
+ UUID uuid;
+
+ public Backup(List inputMessages, LocalDateTime backupTimestamp) {
+ this.inputMessages = inputMessages;
+ this.backupTimestamp = backupTimestamp;
+ this.uuid = UUID.randomUUID();
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/model/InputMessage.java b/apache-flink/src/main/java/com/baeldung/model/InputMessage.java
new file mode 100644
index 0000000000..57a85de81a
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/model/InputMessage.java
@@ -0,0 +1,44 @@
+package com.baeldung.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.time.LocalDateTime;
+
+@JsonSerialize
+public class InputMessage {
+ String sender;
+ String recipient;
+ LocalDateTime sentAt;
+ String message;
+
+ public String getSender() {
+ return sender;
+ }
+
+ public void setSender(String sender) {
+ this.sender = sender;
+ }
+
+ public String getRecipient() {
+ return recipient;
+ }
+
+ public void setRecipient(String recipient) {
+ this.recipient = recipient;
+ }
+
+ public LocalDateTime getSentAt() {
+ return sentAt;
+ }
+
+ public void setSentAt(LocalDateTime sentAt) {
+ this.sentAt = sentAt;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java b/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java
new file mode 100644
index 0000000000..fa525a5ee8
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/schema/BackupSerializationSchema.java
@@ -0,0 +1,29 @@
+package com.baeldung.schema;
+
+import com.baeldung.model.Backup;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BackupSerializationSchema
+ implements SerializationSchema {
+
+ ObjectMapper objectMapper;
+ Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
+
+ @Override
+ public byte[] serialize(Backup backupMessage) {
+ if(objectMapper == null) {
+ objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ }
+ try {
+ String json = objectMapper.writeValueAsString(backupMessage);
+ return json.getBytes();
+ } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
+ logger.error("Failed to parse JSON", e);
+ }
+ return new byte[0];
+ }
+}
diff --git a/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java b/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java
new file mode 100644
index 0000000000..99b6baa6f4
--- /dev/null
+++ b/apache-flink/src/main/java/com/baeldung/schema/InputMessageDeserializationSchema.java
@@ -0,0 +1,33 @@
+package com.baeldung.schema;
+
+import com.baeldung.model.InputMessage;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+public class InputMessageDeserializationSchema implements
+ DeserializationSchema {
+
+ ObjectMapper objectMapper;
+
+ @Override
+ public InputMessage deserialize(byte[] bytes) throws IOException {
+ if(objectMapper == null) {
+ objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ }
+ return objectMapper.readValue(bytes, InputMessage.class);
+ }
+
+ @Override
+ public boolean isEndOfStream(InputMessage inputMessage) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(InputMessage.class);
+ }
+}