BAEL-1994: Updates and small changes

This commit is contained in:
DomWos 2018-08-24 11:14:40 +02:00
parent 610d0ce869
commit 1921b585a6
13 changed files with 198 additions and 28 deletions

View File

@ -204,7 +204,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId> <artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

View File

@ -1,10 +1,11 @@
package com.baeldung; package com.baeldung.flink;
import com.baeldung.flink.BackupAggregator;
import com.baeldung.flink.InputMessageTimestampAssigner; import com.baeldung.flink.model.Backup;
import com.baeldung.flink.WordsCapitalizer; import com.baeldung.flink.model.InputMessage;
import com.baeldung.model.Backup; import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.model.InputMessage; import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@ -12,8 +13,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.Consumers.*; import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.Producers.*; import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline { public class FlinkDataPipeline {

View File

@ -1,7 +1,7 @@
package com.baeldung.flink; package com.baeldung.flink.connector;
import com.baeldung.schema.InputMessageDeserializationSchema; import com.baeldung.flink.model.InputMessage;
import com.baeldung.model.InputMessage; import com.baeldung.flink.schema.InputMessageDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

View File

@ -1,7 +1,7 @@
package com.baeldung.flink; package com.baeldung.flink.connector;
import com.baeldung.schema.BackupSerializationSchema; import com.baeldung.flink.model.Backup;
import com.baeldung.model.Backup; import com.baeldung.flink.schema.BackupSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
@ -12,6 +12,6 @@ public class Producers {
} }
public static FlinkKafkaProducer011<Backup> createBackupProducer(String topic, String kafkaAddress) { public static FlinkKafkaProducer011<Backup> createBackupProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema()); return new FlinkKafkaProducer011<Backup>(kafkaAddress, topic, new BackupSerializationSchema());
} }
} }

View File

@ -1,4 +1,4 @@
package com.baeldung.model; package com.baeldung.flink.model;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -20,4 +20,8 @@ public class Backup {
this.backupTimestamp = backupTimestamp; this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID(); this.uuid = UUID.randomUUID();
} }
public List<InputMessage> getInputMessages() {
return inputMessages;
}
} }

View File

@ -1,6 +1,8 @@
package com.baeldung.model; package com.baeldung.flink.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Objects;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@JsonSerialize @JsonSerialize
@ -10,6 +12,9 @@ public class InputMessage {
LocalDateTime sentAt; LocalDateTime sentAt;
String message; String message;
public InputMessage() {
}
public String getSender() { public String getSender() {
return sender; return sender;
} }
@ -41,4 +46,27 @@ public class InputMessage {
public void setMessage(String message) { public void setMessage(String message) {
this.message = message; this.message = message;
} }
public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) {
this.sender = sender;
this.recipient = recipient;
this.sentAt = sentAt;
this.message = message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InputMessage message1 = (InputMessage) o;
return Objects.equal(sender, message1.sender) &&
Objects.equal(recipient, message1.recipient) &&
Objects.equal(sentAt, message1.sentAt) &&
Objects.equal(message, message1.message);
}
@Override
public int hashCode() {
return Objects.hashCode(sender, recipient, sentAt, message);
}
} }

View File

@ -1,7 +1,7 @@
package com.baeldung.flink; package com.baeldung.flink.operator;
import com.baeldung.model.Backup; import com.baeldung.flink.model.Backup;
import com.baeldung.model.InputMessage; import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime; import java.time.LocalDateTime;

View File

@ -1,6 +1,6 @@
package com.baeldung.flink; package com.baeldung.flink.operator;
import com.baeldung.model.InputMessage; import com.baeldung.flink.model.InputMessage;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;

View File

@ -1,4 +1,4 @@
package com.baeldung.flink; package com.baeldung.flink.operator;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;

View File

@ -1,6 +1,6 @@
package com.baeldung.schema; package com.baeldung.flink.schema;
import com.baeldung.model.Backup; import com.baeldung.flink.model.Backup;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema;

View File

@ -1,6 +1,6 @@
package com.baeldung.schema; package com.baeldung.flink.schema;
import com.baeldung.model.InputMessage; import com.baeldung.flink.model.InputMessage;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.DeserializationSchema;

View File

@ -0,0 +1,103 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.schema.BackupSerializationSchema;
import com.baeldung.flink.schema.InputMessageDeserializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.collections.ListUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class BackupCreatorIntegrationTest {
public static ObjectMapper mapper;
@Before
public void setup() {
mapper = new ObjectMapper().registerModule(new JavaTimeModule());
}
@Test
public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException {
InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
byte[] messageSerialized = mapper.writeValueAsBytes(message);
DeserializationSchema<InputMessage> deserializationSchema = new InputMessageDeserializationSchema();
InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized);
assertEquals(message, messageDeserialized);
}
@Test
public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception {
LocalDateTime currentTime = LocalDateTime.now();
InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage");
InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage");
InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage");
InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage");
InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage");
InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage");
List<InputMessage> firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage);
List<InputMessage> secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage);
List<InputMessage> inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<InputMessage> testDataSet = env.fromCollection(inputMessages);
CollectingSink sink = new CollectingSink();
testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner())
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(sink);
env.execute();
Awaitility.await().until(() -> sink.backups.size() == 2);
assertEquals(2, sink.backups.size());
assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages());
assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages());
}
@Test
public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException {
InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
List<InputMessage> messages = Arrays.asList(message);
Backup backup = new Backup(messages, LocalDateTime.now());
byte[] backupSerialized = mapper.writeValueAsBytes(backup);
SerializationSchema<Backup> serializationSchema = new BackupSerializationSchema();
byte[] backupProcessed = serializationSchema.serialize(backup);
assertEquals(backupSerialized, backupProcessed);
}
private static class CollectingSink implements SinkFunction<Backup> {
public static List<Backup> backups = new ArrayList<>();
@Override
public synchronized void invoke(Backup value, Context context) throws Exception {
backups.add(value);
}
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.flink;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class WordCapitalizerIntegrationTest {
@Test
public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<String> data = Arrays.asList("dog", "cat", "wolf", "pig");
DataSet<String> testDataSet = env.fromCollection(data);
List<String> dataProcessed = testDataSet
.map(new WordsCapitalizer())
.collect();
List<String> testDataCapitalized = data.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
Assert.assertEquals(testDataCapitalized, dataProcessed);
}
}