JAVA-6390: Move kafka articles from libraries-data-3 to new module

apache-kafka
This commit is contained in:
sampadawagde 2021-08-15 17:09:04 +05:30
parent 3f01b5fb0a
commit 0d280b54b9
20 changed files with 128 additions and 148 deletions

View File

@ -0,0 +1,70 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
inputMessagesStream.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -9,23 +9,20 @@ import java.util.Properties;
public class Consumers { public class Consumers {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic( public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties(); Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress); props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup); props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<String> consumer = FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
return consumer; return consumer;
} }
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) { public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id",kafkaGroup); properties.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>( FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
topic, new InputMessageDeserializationSchema(),properties);
return consumer; return consumer;
} }

View File

@ -18,6 +18,7 @@ public class InputMessage {
public String getSender() { public String getSender() {
return sender; return sender;
} }
public void setSender(String sender) { public void setSender(String sender) {
this.sender = sender; this.sender = sender;
} }
@ -55,13 +56,12 @@ public class InputMessage {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o)
if (o == null || getClass() != o.getClass()) return false; return true;
if (o == null || getClass() != o.getClass())
return false;
InputMessage message1 = (InputMessage) o; InputMessage message1 = (InputMessage) o;
return Objects.equal(sender, message1.sender) && return Objects.equal(sender, message1.sender) && Objects.equal(recipient, message1.recipient) && Objects.equal(sentAt, message1.sentAt) && Objects.equal(message, message1.message);
Objects.equal(recipient, message1.recipient) &&
Objects.equal(sentAt, message1.sentAt) &&
Objects.equal(message, message1.message);
} }
@Override @Override

View File

@ -0,0 +1,34 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
@Override @Override
public long extractTimestamp(InputMessage element, long previousElementTimestamp) { public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault(); ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; return element.getSentAt()
.atZone(zoneId)
.toEpochSecond() * 1000;
} }
@Nullable @Nullable

View File

@ -9,8 +9,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class BackupSerializationSchema public class BackupSerializationSchema implements SerializationSchema<Backup> {
implements SerializationSchema<Backup> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@ -18,7 +17,7 @@ public class BackupSerializationSchema
@Override @Override
public byte[] serialize(Backup backupMessage) { public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) { if (objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
} }

View File

@ -8,12 +8,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException; import java.io.IOException;
public class InputMessageDeserializationSchema implements public class InputMessageDeserializationSchema implements DeserializationSchema<InputMessage> {
DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@Override @Override
public InputMessage deserialize(byte[] bytes) throws IOException { public InputMessage deserialize(byte[] bytes) throws IOException {

View File

@ -19,9 +19,7 @@ public class CountryPopulationConsumer {
private java.util.function.Consumer<Throwable> exceptionConsumer; private java.util.function.Consumer<Throwable> exceptionConsumer;
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer; private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
public CountryPopulationConsumer( public CountryPopulationConsumer(Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer, java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
this.consumer = consumer; this.consumer = consumer;
this.exceptionConsumer = exceptionConsumer; this.exceptionConsumer = exceptionConsumer;
this.countryPopulationConsumer = countryPopulationConsumer; this.countryPopulationConsumer = countryPopulationConsumer;

View File

@ -15,8 +15,7 @@ public class KafkaProducer {
} }
public Future<RecordMetadata> send(String key, String value) { public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news", ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
key, value);
return producer.send(record); return producer.send(record);
} }
@ -36,5 +35,4 @@ public class KafkaProducer {
producer.commitTransaction(); producer.commitTransaction();
} }
} }

View File

@ -1,82 +0,0 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream =
environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -1,34 +0,0 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}