[JAVA-18801] Fixed integration tests (#13617)

* [JAVA-18801] Fixed integration tests

* [JAVA-18801] Clean up
This commit is contained in:
panos-kakos 2023-03-17 16:41:16 +02:00 committed by GitHub
parent bcd8625e9b
commit 0d1e9c6711
4 changed files with 39 additions and 21 deletions

View File

@ -35,12 +35,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
@ -67,7 +67,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
@ -163,11 +163,29 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>
--add-opens java.base/java.time=ALL-UNNAMED
--add-opens java.base/java.nio=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<kafka.version>2.8.0</kafka.version>
<kafka.version>3.4.0</kafka.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
<flink.version>1.5.0</flink.version>
<flink.version>1.16.1</flink.version>
<awaitility.version>3.0.0</awaitility.version>
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>

View File

@ -9,8 +9,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
@ -25,12 +25,12 @@ public class FlinkDataPipeline {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
FlinkKafkaConsumer<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
@ -48,11 +48,11 @@ public class FlinkDataPipeline {
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
FlinkKafkaConsumer<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
FlinkKafkaProducer<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);

View File

@ -3,26 +3,26 @@ package com.baeldung.flink.connector;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.schema.InputMessageDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
public static FlinkKafkaConsumer<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
return consumer;
}
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
public static FlinkKafkaConsumer<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
FlinkKafkaConsumer<InputMessage> consumer = new FlinkKafkaConsumer<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
return consumer;
}

View File

@ -3,15 +3,15 @@ package com.baeldung.flink.connector;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.schema.BackupSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Producers {
public static FlinkKafkaProducer011<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema());
public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
}
public static FlinkKafkaProducer011<Backup> createBackupProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<Backup>(kafkaAddress, topic, new BackupSerializationSchema());
public static FlinkKafkaProducer<Backup> createBackupProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<Backup>(kafkaAddress, topic, new BackupSerializationSchema());
}
}