From 0d1e9c6711e0bf024ce460bf0eabf41d4ce90b13 Mon Sep 17 00:00:00 2001 From: panos-kakos <102670093+panos-kakos@users.noreply.github.com> Date: Fri, 17 Mar 2023 16:41:16 +0200 Subject: [PATCH] [JAVA-18801] Fixed integration tests (#13617) * [JAVA-18801] Fixed integration tests * [JAVA-18801] Clean up --- apache-kafka/pom.xml | 28 +++++++++++++++---- .../com/baeldung/flink/FlinkDataPipeline.java | 12 ++++---- .../baeldung/flink/connector/Consumers.java | 10 +++---- .../baeldung/flink/connector/Producers.java | 10 +++---- 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/apache-kafka/pom.xml b/apache-kafka/pom.xml index 7bde041f32..915583fed8 100644 --- a/apache-kafka/pom.xml +++ b/apache-kafka/pom.xml @@ -35,12 +35,12 @@ org.apache.flink - flink-connector-kafka-0.11_2.11 + flink-connector-kafka ${flink.version} org.apache.flink - flink-streaming-java_2.11 + flink-streaming-java ${flink.version} @@ -67,7 +67,7 @@ org.apache.flink - flink-test-utils_2.11 + flink-test-utils ${flink.version} test @@ -163,11 +163,29 @@ + + + + org.apache.maven.plugins + maven-surefire-plugin + + + --add-opens java.base/java.time=ALL-UNNAMED + --add-opens java.base/java.nio=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + - 2.8.0 + 3.4.0 1.15.3 1.15.3 - 1.5.0 + 1.16.1 3.0.0 2.4.8 0.8.1-spark3.0-s_2.12 diff --git a/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java index 4502b628b2..8d15b79a63 100644 --- a/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java @@ -9,8 +9,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import static com.baeldung.flink.connector.Consumers.*; import static com.baeldung.flink.connector.Producers.*; @@ -25,12 +25,12 @@ public class FlinkDataPipeline { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup); + FlinkKafkaConsumer flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup); flinkKafkaConsumer.setStartFromEarliest(); DataStream stringInputStream = environment.addSource(flinkKafkaConsumer); - FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer(outputTopic, address); + FlinkKafkaProducer flinkKafkaProducer = createStringProducer(outputTopic, address); stringInputStream.map(new WordsCapitalizer()) .addSink(flinkKafkaProducer); @@ -48,11 +48,11 @@ public class FlinkDataPipeline { environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); + FlinkKafkaConsumer flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest(); flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()); - FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); + FlinkKafkaProducer flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource(flinkKafkaConsumer); diff --git a/apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java b/apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java index c72cb8a2d6..358c9627f7 100644 --- a/apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java @@ -3,26 +3,26 @@ package com.baeldung.flink.connector; import com.baeldung.flink.model.InputMessage; import com.baeldung.flink.schema.InputMessageDeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class Consumers { - public static FlinkKafkaConsumer011 createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) { + public static FlinkKafkaConsumer createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) { Properties props = new Properties(); props.setProperty("bootstrap.servers", kafkaAddress); props.setProperty("group.id", kafkaGroup); - FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props); + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props); return consumer; } - public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) { + public static FlinkKafkaConsumer createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id", kafkaGroup); - FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(topic, new InputMessageDeserializationSchema(), properties); + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new InputMessageDeserializationSchema(), properties); return consumer; } diff --git a/apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java b/apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java index 8e6f3f8f37..a4cb2d70c2 100644 --- a/apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java @@ -3,15 +3,15 @@ package com.baeldung.flink.connector; import com.baeldung.flink.model.Backup; import com.baeldung.flink.schema.BackupSerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class Producers { - public static FlinkKafkaProducer011 createStringProducer(String topic, String kafkaAddress) { - return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema()); + public static FlinkKafkaProducer createStringProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema()); } - public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) { - return new FlinkKafkaProducer011(kafkaAddress, topic, new BackupSerializationSchema()); + public static FlinkKafkaProducer createBackupProducer(String topic, String kafkaAddress) { + return new FlinkKafkaProducer(kafkaAddress, topic, new BackupSerializationSchema()); } }