BAEL-1994: Flink and Kafka Data Pipeline

This commit is contained in:
DomWos 2018-08-13 00:08:55 +02:00
parent 6e646bf0de
commit e828a0de9a
11 changed files with 364 additions and 0 deletions

37
apache-flink/pom.xml Normal file
View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>groupId</groupId>
<artifactId>apache-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.9.6</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,81 @@
package com.baeldung;
import com.baeldung.flink.BackupAggregator;
import com.baeldung.flink.InputMessageTimestampAssigner;
import com.baeldung.flink.WordsCapitalizer;
import com.baeldung.model.Backup;
import com.baeldung.model.InputMessage;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.Consumers.*;
import static com.baeldung.flink.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "192.168.99.100:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream =
environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.flink;
import com.baeldung.model.Backup;
import com.baeldung.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@ -0,0 +1,32 @@
package com.baeldung.flink;
import com.baeldung.schema.InputMessageDeserializationSchema;
import com.baeldung.model.InputMessage;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
return consumer;
}
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
topic, new InputMessageDeserializationSchema(),properties);
return consumer;
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.flink;
import com.baeldung.model.InputMessage;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
import java.time.ZoneId;
public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks<InputMessage> {
@Override
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 15);
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.flink;
import com.baeldung.schema.BackupSerializationSchema;
import com.baeldung.model.Backup;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
public class Producers {
public static FlinkKafkaProducer011<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema());
}
public static FlinkKafkaProducer011<Backup> createBackupProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema());
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.flink;
import org.apache.flink.api.common.functions.MapFunction;
public class WordsCapitalizer implements MapFunction<String, String> {
@Override
public String map(String s) {
return s.toUpperCase();
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
public class Backup {
@JsonProperty("inputMessages")
List<InputMessage> inputMessages;
@JsonProperty("backupTimestamp")
LocalDateTime backupTimestamp;
@JsonProperty("uuid")
UUID uuid;
public Backup(List<InputMessage> inputMessages, LocalDateTime backupTimestamp) {
this.inputMessages = inputMessages;
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
}

View File

@ -0,0 +1,44 @@
package com.baeldung.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.time.LocalDateTime;
@JsonSerialize
public class InputMessage {
String sender;
String recipient;
LocalDateTime sentAt;
String message;
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
public String getRecipient() {
return recipient;
}
public void setRecipient(String recipient) {
this.recipient = recipient;
}
public LocalDateTime getSentAt() {
return sentAt;
}
public void setSentAt(LocalDateTime sentAt) {
this.sentAt = sentAt;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -0,0 +1,29 @@
package com.baeldung.schema;
import com.baeldung.model.Backup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BackupSerializationSchema
implements SerializationSchema<Backup> {
ObjectMapper objectMapper;
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
}
try {
String json = objectMapper.writeValueAsString(backupMessage);
return json.getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}

View File

@ -0,0 +1,33 @@
package com.baeldung.schema;
import com.baeldung.model.InputMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
ObjectMapper objectMapper;
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {
if(objectMapper == null) {
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
}
return objectMapper.readValue(bytes, InputMessage.class);
}
@Override
public boolean isEndOfStream(InputMessage inputMessage) {
return false;
}
@Override
public TypeInformation<InputMessage> getProducedType() {
return TypeInformation.of(InputMessage.class);
}
}