diff --git a/libraries/pom.xml b/libraries/pom.xml index 80e3303ba5..c8135d8d2a 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -154,6 +154,17 @@ commons-dbutils ${commons.dbutils.version} + + org.apache.flink + flink-connector-kafka-0.11_2.11 + ${flink.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + + org.apache.flink flink-core @@ -178,7 +189,7 @@ org.apache.flink - flink-test-utils_2.10 + flink-test-utils_2.11 ${flink.version} test @@ -228,6 +239,11 @@ jackson-databind ${jackson.version} + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + org.datanucleus @@ -902,7 +918,7 @@ 4.5.3 2.5 - 1.2.0 + 1.5.0 2.8.5 2.92 1.9.26 diff --git a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java new file mode 100644 index 0000000000..d02b1bcb83 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java @@ -0,0 +1,82 @@ +package com.baeldung.flink; + + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.operator.BackupAggregator; +import com.baeldung.flink.operator.InputMessageTimestampAssigner; +import com.baeldung.flink.operator.WordsCapitalizer; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +import static com.baeldung.flink.connector.Consumers.*; +import static com.baeldung.flink.connector.Producers.*; + +public class FlinkDataPipeline { + + public static void capitalize() throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String address = "localhost:9092"; + + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + + FlinkKafkaConsumer011 flinkKafkaConsumer = + createStringConsumerForTopic(inputTopic, address, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + DataStream stringInputStream = + environment.addSource(flinkKafkaConsumer); + + FlinkKafkaProducer011 flinkKafkaProducer = + createStringProducer(outputTopic, address); + + stringInputStream + .map(new WordsCapitalizer()) + .addSink(flinkKafkaProducer); + + environment.execute(); + } + +public static void createBackup () throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String kafkaAddress = "localhost:9092"; + + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer011 flinkKafkaConsumer = + createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + flinkKafkaConsumer + .assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()); + FlinkKafkaProducer011 flinkKafkaProducer = + createBackupProducer(outputTopic, kafkaAddress); + + DataStream inputMessagesStream = + environment.addSource(flinkKafkaConsumer); + + inputMessagesStream + .timeWindowAll(Time.hours(24)) + .aggregate(new BackupAggregator()) + .addSink(flinkKafkaProducer); + + environment.execute(); +} + + public static void main(String[] args) throws Exception { + createBackup(); + } + +} diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java new file mode 100644 index 0000000000..514085f9c4 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java @@ -0,0 +1,32 @@ +package com.baeldung.flink.connector; + +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.schema.InputMessageDeserializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; + +import java.util.Properties; + +public class Consumers { + +public static FlinkKafkaConsumer011 createStringConsumerForTopic( + String topic, String kafkaAddress, String kafkaGroup ) { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", kafkaAddress); + props.setProperty("group.id",kafkaGroup); + FlinkKafkaConsumer011 consumer = + new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props); + + return consumer; +} + + public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", kafkaAddress); + properties.setProperty("group.id",kafkaGroup); + FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( + topic, new InputMessageDeserializationSchema(),properties); + + return consumer; + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Producers.java b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java new file mode 100644 index 0000000000..8e6f3f8f37 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java @@ -0,0 +1,17 @@ +package com.baeldung.flink.connector; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.schema.BackupSerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +public class Producers { + + public static FlinkKafkaProducer011 createStringProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema()); + } + + public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer011(kafkaAddress, topic, new BackupSerializationSchema()); + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/model/Backup.java b/libraries/src/main/java/com/baeldung/flink/model/Backup.java new file mode 100644 index 0000000000..268ceec7f3 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/model/Backup.java @@ -0,0 +1,27 @@ +package com.baeldung.flink.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +public class Backup { + + @JsonProperty("inputMessages") + List inputMessages; + @JsonProperty("backupTimestamp") + LocalDateTime backupTimestamp; + @JsonProperty("uuid") + UUID uuid; + + public Backup(List inputMessages, LocalDateTime backupTimestamp) { + this.inputMessages = inputMessages; + this.backupTimestamp = backupTimestamp; + this.uuid = UUID.randomUUID(); + } + + public List getInputMessages() { + return inputMessages; + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java new file mode 100644 index 0000000000..183fa69c11 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java @@ -0,0 +1,72 @@ +package com.baeldung.flink.model; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.base.Objects; + +import java.time.LocalDateTime; + +@JsonSerialize +public class InputMessage { + String sender; + String recipient; + LocalDateTime sentAt; + String message; + + public InputMessage() { + } + + public String getSender() { + return sender; + } + + public void setSender(String sender) { + this.sender = sender; + } + + public String getRecipient() { + return recipient; + } + + public void setRecipient(String recipient) { + this.recipient = recipient; + } + + public LocalDateTime getSentAt() { + return sentAt; + } + + public void setSentAt(LocalDateTime sentAt) { + this.sentAt = sentAt; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) { + this.sender = sender; + this.recipient = recipient; + this.sentAt = sentAt; + this.message = message; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + InputMessage message1 = (InputMessage) o; + return Objects.equal(sender, message1.sender) && + Objects.equal(recipient, message1.recipient) && + Objects.equal(sentAt, message1.sentAt) && + Objects.equal(message, message1.message); + } + + @Override + public int hashCode() { + return Objects.hashCode(sender, recipient, sentAt, message); + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java new file mode 100644 index 0000000000..c39b8413d1 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java @@ -0,0 +1,34 @@ +package com.baeldung.flink.operator; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + + public class BackupAggregator implements AggregateFunction, Backup> { + @Override + public List createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List add(InputMessage inputMessage, List inputMessages) { + inputMessages.add(inputMessage); + return inputMessages; + } + + @Override + public Backup getResult(List inputMessages) { + Backup backup = new Backup(inputMessages, LocalDateTime.now()); + return backup; + } + + @Override + public List merge(List inputMessages, List acc1) { + inputMessages.addAll(acc1); + return inputMessages; + } + } diff --git a/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java new file mode 100644 index 0000000000..05828d9588 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java @@ -0,0 +1,23 @@ +package com.baeldung.flink.operator; + +import com.baeldung.flink.model.InputMessage; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; +import java.time.ZoneId; + +public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { + + @Override + public long extractTimestamp(InputMessage element, long previousElementTimestamp) { + ZoneId zoneId = ZoneId.systemDefault(); + return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp - 15); + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java new file mode 100644 index 0000000000..f9103d225c --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java @@ -0,0 +1,11 @@ +package com.baeldung.flink.operator; + +import org.apache.flink.api.common.functions.MapFunction; + +public class WordsCapitalizer implements MapFunction { + + @Override + public String map(String s) { + return s.toUpperCase(); + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java new file mode 100644 index 0000000000..967b266bb6 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java @@ -0,0 +1,33 @@ +package com.baeldung.flink.schema; + +import com.baeldung.flink.model.Backup; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BackupSerializationSchema + implements SerializationSchema { + + static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + + Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); + + @Override + public byte[] serialize(Backup backupMessage) { + if(objectMapper == null) { + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + try { + String json = objectMapper.writeValueAsString(backupMessage); + return json.getBytes(); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + logger.error("Failed to parse JSON", e); + } + return new byte[0]; + } +} diff --git a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java new file mode 100644 index 0000000000..1df456bbe5 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java @@ -0,0 +1,34 @@ +package com.baeldung.flink.schema; + +import com.baeldung.flink.model.InputMessage; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +public class InputMessageDeserializationSchema implements + DeserializationSchema { + + static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + + + @Override + public InputMessage deserialize(byte[] bytes) throws IOException { + + return objectMapper.readValue(bytes, InputMessage.class); + } + + @Override + public boolean isEndOfStream(InputMessage inputMessage) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(InputMessage.class); + } +} diff --git a/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java new file mode 100644 index 0000000000..ab7d119c16 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java @@ -0,0 +1,103 @@ +package com.baeldung.flink; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.operator.BackupAggregator; +import com.baeldung.flink.operator.InputMessageTimestampAssigner; +import com.baeldung.flink.schema.BackupSerializationSchema; +import com.baeldung.flink.schema.InputMessageDeserializationSchema; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class BackupCreatorIntegrationTest { + public static ObjectMapper mapper; + + @Before + public void setup() { + mapper = new ObjectMapper().registerModule(new JavaTimeModule()); + } + + @Test + public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException { + InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message"); + byte[] messageSerialized = mapper.writeValueAsBytes(message); + DeserializationSchema deserializationSchema = new InputMessageDeserializationSchema(); + InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized); + + assertEquals(message, messageDeserialized); + } + + @Test + public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception { + LocalDateTime currentTime = LocalDateTime.now(); + InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage"); + InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage"); + InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage"); + InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage"); + InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage"); + InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage"); + + List firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage); + List secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage); + List inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + DataStreamSource testDataSet = env.fromCollection(inputMessages); + CollectingSink sink = new CollectingSink(); + testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()) + .timeWindowAll(Time.hours(24)) + .aggregate(new BackupAggregator()) + .addSink(sink); + + env.execute(); + + Awaitility.await().until(() -> sink.backups.size() == 2); + assertEquals(2, sink.backups.size()); + assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages()); + assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages()); + + } + + @Test + public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException { + InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message"); + List messages = Arrays.asList(message); + Backup backup = new Backup(messages, LocalDateTime.now()); + byte[] backupSerialized = mapper.writeValueAsBytes(backup); + SerializationSchema serializationSchema = new BackupSerializationSchema(); + byte[] backupProcessed = serializationSchema.serialize(backup); + + assertEquals(backupSerialized, backupProcessed); + } + + private static class CollectingSink implements SinkFunction { + + public static List backups = new ArrayList<>(); + + @Override + public synchronized void invoke(Backup value, Context context) throws Exception { + backups.add(value); + } + } +} diff --git a/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java new file mode 100644 index 0000000000..8a98dae4b5 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java @@ -0,0 +1,34 @@ +package com.baeldung.flink; + +import com.baeldung.flink.operator.WordsCapitalizer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class WordCapitalizerIntegrationTest { + + @Test + public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + List data = Arrays.asList("dog", "cat", "wolf", "pig"); + + DataSet testDataSet = env.fromCollection(data); + + + List dataProcessed = testDataSet + .map(new WordsCapitalizer()) + .collect(); + + List testDataCapitalized = data.stream() + .map(String::toUpperCase) + .collect(Collectors.toList()); + + Assert.assertEquals(testDataCapitalized, dataProcessed); + } + +}