Merge branch 'task/JAVA-3247' of https://github.com/dkapil/tutorials into task/JAVA-3247

This commit is contained in:
Dhawal Kapil 2021-08-21 14:36:58 +05:30
commit a070ea706d
107 changed files with 11447 additions and 2474 deletions

18
apache-kafka/README.md Normal file
View File

@ -0,0 +1,18 @@
## Apache Kafka
This module contains articles about Apache Kafka.
### Relevant articles
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
##### Building the project
You can build the project from the command line using: *mvn clean install*, or in an IDE.

180
apache-kafka/pom.xml Normal file
View File

@ -0,0 +1,180 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>apache-kafka</artifactId>
<name>apache-kafka</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility-proxy</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${com.datastax.spark.spark-cassandra-connector.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
</dependency>
</dependencies>
<properties>
<assertj.version>3.6.2</assertj.version>
<kafka.version>2.8.0</kafka.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
<flink.version>1.5.0</flink.version>
<awaitility.version>3.0.0</awaitility.version>
<guava.version>29.0-jre</guava.version>
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
</properties>
</project>

View File

@ -0,0 +1,70 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
inputMessagesStream.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -9,23 +9,20 @@ import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
return consumer;
}
return consumer;
}
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
topic, new InputMessageDeserializationSchema(),properties);
properties.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
return consumer;
}

View File

@ -18,6 +18,7 @@ public class InputMessage {
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
@ -55,12 +56,14 @@ public class InputMessage {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
InputMessage message1 = (InputMessage) o;
return Objects.equal(sender, message1.sender) &&
Objects.equal(recipient, message1.recipient) &&
Objects.equal(sentAt, message1.sentAt) &&
return Objects.equal(sender, message1.sender) &&
Objects.equal(recipient, message1.recipient) &&
Objects.equal(sentAt, message1.sentAt) &&
Objects.equal(message, message1.message);
}

View File

@ -0,0 +1,34 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
@Override
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
return element.getSentAt()
.atZone(zoneId)
.toEpochSecond() * 1000;
}
@Nullable

View File

@ -9,8 +9,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BackupSerializationSchema
implements SerializationSchema<Backup> {
public class BackupSerializationSchema implements SerializationSchema<Backup> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@ -18,7 +17,7 @@ public class BackupSerializationSchema
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
if (objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
}

View File

@ -8,12 +8,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
public class InputMessageDeserializationSchema implements DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {

View File

@ -27,11 +27,11 @@ public class KafkaTopicApplication {
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic));
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
// get the async result for the new topic creation
KafkaFuture<Void> future = result.values().get(topicName);
KafkaFuture<Void> future = result.values()
.get(topicName);
// call get() to block until topic creation has completed or failed
future.get();
@ -47,15 +47,13 @@ public class KafkaTopicApplication {
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
.validateOnly(true)
.retryOnQuotaViolation(true);
CreateTopicsOptions topicOptions = new CreateTopicsOptions().validateOnly(true)
.retryOnQuotaViolation(true);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic), topicOptions
);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic), topicOptions);
KafkaFuture<Void> future = result.values().get(topicName);
KafkaFuture<Void> future = result.values()
.get(topicName);
future.get();
}
}
@ -72,14 +70,12 @@ public class KafkaTopicApplication {
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(newTopicConfig);
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor).configs(newTopicConfig);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic)
);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
KafkaFuture<Void> future = result.values().get(topicName);
KafkaFuture<Void> future = result.values()
.get(topicName);
future.get();
}
}

View File

@ -19,9 +19,7 @@ public class CountryPopulationConsumer {
private java.util.function.Consumer<Throwable> exceptionConsumer;
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
public CountryPopulationConsumer(
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
public CountryPopulationConsumer(Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer, java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
this.consumer = consumer;
this.exceptionConsumer = exceptionConsumer;
this.countryPopulationConsumer = countryPopulationConsumer;

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka;
package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -24,16 +24,16 @@ public class TransactionalMessageProducer {
producer.initTransactions();
try{
try {
producer.beginTransaction();
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
new ProducerRecord<String, String>("input", null, s)));
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2)
.forEach(s -> producer.send(new ProducerRecord<String, String>("input", null, s)));
producer.commitTransaction();
}catch (KafkaException e){
} catch (KafkaException e) {
producer.abortTransaction();

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka;
package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -43,10 +43,11 @@ public class TransactionalWordCount {
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
.stream()
.flatMap(record -> Stream.of(record.value().split(" ")))
.map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
.stream()
.flatMap(record -> Stream.of(record.value()
.split(" ")))
.map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
producer.beginTransaction();
@ -56,7 +57,8 @@ public class TransactionalWordCount {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
long offset = partitionedRecords.get(partitionedRecords.size() - 1)
.offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}
@ -72,7 +74,6 @@ public class TransactionalWordCount {
}
}
private static KafkaConsumer<String, String> createKafkaConsumer() {

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka;
package com.baeldung.kafka.exactlyonce;
public class Tuple {
@ -10,8 +10,8 @@ public class Tuple {
this.value = value;
}
public static Tuple of(String key, Integer value){
return new Tuple(key,value);
public static Tuple of(String key, Integer value) {
return new Tuple(key, value);
}
public String getKey() {

View File

@ -15,8 +15,7 @@ public class KafkaProducer {
}
public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news",
key, value);
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
return producer.send(record);
}
@ -36,5 +35,4 @@ public class KafkaProducer {
producer.commitTransaction();
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka.streams;
package com.baeldung.kafka.streamsvsconsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;

View File

@ -1,61 +1,78 @@
package com.baeldung.kafkastreams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.Ignore;
import org.junit.Test;
public class KafkaStreamsLiveTest {
private String bootstrapServers = "localhost:9092";
private Path stateDirectory;
@Test
@Ignore("it needs to have kafka broker running on local")
public void shouldTestKafkaStreams() throws InterruptedException {
//given
// given
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()
.getClass()
.getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String()
.getClass()
.getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
try {
this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath()
.toString());
} catch (final IOException e) {
throw new UncheckedIOException("Cannot create temporary directory", e);
}
//when
KStreamBuilder builder = new KStreamBuilder();
// when
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
String outputTopic = "outputTopic";
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();
//then
// then
Thread.sleep(30000);
streams.close();
}

View File

@ -1,48 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>core-java-16</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>core-java-16</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>core-java-16</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>core-java-16</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${maven.compiler.source.version}</source>
<target>${maven.compiler.target.version}</target>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${maven.compiler.source.version}</source>
<target>${maven.compiler.target.version}</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source.version>16</maven.compiler.source.version>
<maven.compiler.target.version>16</maven.compiler.target.version>
<assertj.version>3.6.1</assertj.version>
</properties>
<properties>
<maven.compiler.source.version>16</maven.compiler.source.version>
<maven.compiler.target.version>16</maven.compiler.target.version>
<assertj.version>3.6.1</assertj.version>
</properties>
</project>

View File

@ -0,0 +1,38 @@
package com.baeldung.java_16_features.groupingby;
public class BlogPost {
private String title;
private String author;
private BlogPostType type;
private int likes;
record AuthPostTypesLikes(String author, BlogPostType type, int likes) {};
public BlogPost(String title, String author, BlogPostType type, int likes) {
this.title = title;
this.author = author;
this.type = type;
this.likes = likes;
}
public String getTitle() {
return title;
}
public String getAuthor() {
return author;
}
public BlogPostType getType() {
return type;
}
public int getLikes() {
return likes;
}
@Override
public String toString() {
return "BlogPost{" + "title='" + title + '\'' + ", type=" + type + ", likes=" + likes + '}';
}
}

View File

@ -0,0 +1,5 @@
package com.baeldung.java_16_features.groupingby;
public enum BlogPostType {
NEWS, REVIEW, GUIDE
}

View File

@ -0,0 +1,41 @@
package com.baeldung.java_16_features.groupingby;
import java.util.Objects;
public class Tuple {
private final BlogPostType type;
private final String author;
public Tuple(BlogPostType type, String author) {
this.type = type;
this.author = author;
}
public BlogPostType getType() {
return type;
}
public String getAuthor() {
return author;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tuple tuple = (Tuple) o;
return type == tuple.type && author.equals(tuple.author);
}
@Override
public int hashCode() {
return Objects.hash(type, author);
}
@Override
public String toString() {
return "Tuple{" + "type=" + type + ", author='" + author + '\'' + '}';
}
}

View File

@ -0,0 +1,254 @@
package com.baeldung.java_16_features.groupingby;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.averagingInt;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.groupingByConcurrent;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.maxBy;
import static java.util.stream.Collectors.summarizingInt;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
public class JavaGroupingByCollectorUnitTest {
private static final List<BlogPost> posts = Arrays.asList(new BlogPost("News item 1", "Author 1", BlogPostType.NEWS, 15), new BlogPost("Tech review 1", "Author 2", BlogPostType.REVIEW, 5),
new BlogPost("Programming guide", "Author 1", BlogPostType.GUIDE, 20), new BlogPost("News item 2", "Author 2", BlogPostType.NEWS, 35), new BlogPost("Tech review 2", "Author 1", BlogPostType.REVIEW, 15));
@Test
public void givenAListOfPosts_whenGroupedByType_thenGetAMapBetweenTypeAndPosts() {
Map<BlogPostType, List<BlogPost>> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndTheirTitlesAreJoinedInAString_thenGetAMapBetweenTypeAndCsvTitles() {
Map<BlogPostType, String> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, mapping(BlogPost::getTitle, joining(", ", "Post titles: [", "]"))));
assertEquals("Post titles: [News item 1, News item 2]", postsPerType.get(BlogPostType.NEWS));
assertEquals("Post titles: [Programming guide]", postsPerType.get(BlogPostType.GUIDE));
assertEquals("Post titles: [Tech review 1, Tech review 2]", postsPerType.get(BlogPostType.REVIEW));
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndSumTheLikes_thenGetAMapBetweenTypeAndPostLikes() {
Map<BlogPostType, Integer> likesPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, summingInt(BlogPost::getLikes)));
assertEquals(50, likesPerType.get(BlogPostType.NEWS)
.intValue());
assertEquals(20, likesPerType.get(BlogPostType.REVIEW)
.intValue());
assertEquals(20, likesPerType.get(BlogPostType.GUIDE)
.intValue());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeInAnEnumMap_thenGetAnEnumMapBetweenTypeAndPosts() {
EnumMap<BlogPostType, List<BlogPost>> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, () -> new EnumMap<>(BlogPostType.class), toList()));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeInSets_thenGetAMapBetweenTypesAndSetsOfPosts() {
Map<BlogPostType, Set<BlogPost>> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, toSet()));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeConcurrently_thenGetAMapBetweenTypeAndPosts() {
ConcurrentMap<BlogPostType, List<BlogPost>> postsPerType = posts.parallelStream()
.collect(groupingByConcurrent(BlogPost::getType));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndAveragingLikes_thenGetAMapBetweenTypeAndAverageNumberOfLikes() {
Map<BlogPostType, Double> averageLikesPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, averagingInt(BlogPost::getLikes)));
assertEquals(25, averageLikesPerType.get(BlogPostType.NEWS)
.intValue());
assertEquals(20, averageLikesPerType.get(BlogPostType.GUIDE)
.intValue());
assertEquals(10, averageLikesPerType.get(BlogPostType.REVIEW)
.intValue());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndCounted_thenGetAMapBetweenTypeAndNumberOfPosts() {
Map<BlogPostType, Long> numberOfPostsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, counting()));
assertEquals(2, numberOfPostsPerType.get(BlogPostType.NEWS)
.intValue());
assertEquals(1, numberOfPostsPerType.get(BlogPostType.GUIDE)
.intValue());
assertEquals(2, numberOfPostsPerType.get(BlogPostType.REVIEW)
.intValue());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndMaxingLikes_thenGetAMapBetweenTypeAndMaximumNumberOfLikes() {
Map<BlogPostType, Optional<BlogPost>> maxLikesPerPostType = posts.stream()
.collect(groupingBy(BlogPost::getType, maxBy(comparingInt(BlogPost::getLikes))));
assertTrue(maxLikesPerPostType.get(BlogPostType.NEWS)
.isPresent());
assertEquals(35, maxLikesPerPostType.get(BlogPostType.NEWS)
.get()
.getLikes());
assertTrue(maxLikesPerPostType.get(BlogPostType.GUIDE)
.isPresent());
assertEquals(20, maxLikesPerPostType.get(BlogPostType.GUIDE)
.get()
.getLikes());
assertTrue(maxLikesPerPostType.get(BlogPostType.REVIEW)
.isPresent());
assertEquals(15, maxLikesPerPostType.get(BlogPostType.REVIEW)
.get()
.getLikes());
}
@Test
public void givenAListOfPosts_whenGroupedByAuthorAndThenByType_thenGetAMapBetweenAuthorAndMapsBetweenTypeAndBlogPosts() {
Map<String, Map<BlogPostType, List<BlogPost>>> map = posts.stream()
.collect(groupingBy(BlogPost::getAuthor, groupingBy(BlogPost::getType)));
assertEquals(1, map.get("Author 1")
.get(BlogPostType.NEWS)
.size());
assertEquals(1, map.get("Author 1")
.get(BlogPostType.GUIDE)
.size());
assertEquals(1, map.get("Author 1")
.get(BlogPostType.REVIEW)
.size());
assertEquals(1, map.get("Author 2")
.get(BlogPostType.NEWS)
.size());
assertEquals(1, map.get("Author 2")
.get(BlogPostType.REVIEW)
.size());
assertNull(map.get("Author 2")
.get(BlogPostType.GUIDE));
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndSummarizingLikes_thenGetAMapBetweenTypeAndSummary() {
Map<BlogPostType, IntSummaryStatistics> likeStatisticsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, summarizingInt(BlogPost::getLikes)));
IntSummaryStatistics newsLikeStatistics = likeStatisticsPerType.get(BlogPostType.NEWS);
assertEquals(2, newsLikeStatistics.getCount());
assertEquals(50, newsLikeStatistics.getSum());
assertEquals(25.0, newsLikeStatistics.getAverage(), 0.001);
assertEquals(35, newsLikeStatistics.getMax());
assertEquals(15, newsLikeStatistics.getMin());
}
@Test
public void givenAListOfPosts_whenGroupedByComplexMapPairKeyType_thenGetAMapBetweenPairAndList() {
Map<Pair<BlogPostType, String>, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
.collect(groupingBy(post -> new ImmutablePair<>(post.getType(), post.getAuthor())));
List<BlogPost> result = postsPerTypeAndAuthor.get(new ImmutablePair<>(BlogPostType.GUIDE, "Author 1"));
assertThat(result.size()).isEqualTo(1);
BlogPost blogPost = result.get(0);
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
}
@Test
public void givenAListOfPosts_whenGroupedByComplexMapKeyType_thenGetAMapBetweenTupleAndList() {
Map<Tuple, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
.collect(groupingBy(post -> new Tuple(post.getType(), post.getAuthor())));
List<BlogPost> result = postsPerTypeAndAuthor.get(new Tuple(BlogPostType.GUIDE, "Author 1"));
assertThat(result.size()).isEqualTo(1);
BlogPost blogPost = result.get(0);
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
}
@Test
public void givenAListOfPosts_whenGroupedByRecord_thenGetAMapBetweenRecordAndList() {
Map<BlogPost.AuthPostTypesLikes, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
.collect(groupingBy(post -> new BlogPost.AuthPostTypesLikes(post.getAuthor(), post.getType(), post.getLikes())));
List<BlogPost> result = postsPerTypeAndAuthor.get(new BlogPost.AuthPostTypesLikes("Author 1", BlogPostType.GUIDE, 20));
assertThat(result.size()).isEqualTo(1);
BlogPost blogPost = result.get(0);
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
assertThat(blogPost.getLikes()).isEqualTo(20);
}
}

View File

@ -189,21 +189,8 @@ public class StopExecution {
longRunningSort();
}
private void longRunningOperation() {
LOG.info("long Running operation started");
try {
//Thread.sleep(500);
longFileRead();
LOG.info("long running operation finished");
} catch (InterruptedException e) {
LOG.info("long Running operation interrupted");
}
}
private void longRunningSort() {
LOG.info("long Running task started");
// Do you long running calculation here
LOG.info("Long running task started");
int len = 100000;
List<Integer> numbers = new ArrayList<>();
try {
@ -229,25 +216,7 @@ public class StopExecution {
LOG.info("Index position: " + i);
LOG.info("Long running task finished");
} catch (InterruptedException e) {
LOG.info("long Running operation interrupted");
}
}
private void longFileRead() throws InterruptedException {
String file = "input.txt";
ClassLoader classloader = getClass().getClassLoader();
try (InputStream inputStream = classloader.getResourceAsStream(file)) {
Reader inputStreamReader = new InputStreamReader(inputStream);
int data = inputStreamReader.read();
while (data != -1) {
char theChar = (char) data;
data = inputStreamReader.read();
throwExceptionOnThreadInterrupt();
}
} catch (IOException e) {
LOG.error("Exception: ", e);
LOG.info("Long running operation interrupted");
}
}

View File

@ -13,12 +13,9 @@ import java.util.stream.IntStream;
public class TimeApi {
public static List<Date> getDatesBetweenUsingJava7(Date startDate, Date endDate) {
List<Date> datesInRange = new ArrayList<Date>();
Calendar calendar = new GregorianCalendar();
calendar.setTime(startDate);
Calendar endCalendar = new GregorianCalendar();
endCalendar.setTime(endDate);
List<Date> datesInRange = new ArrayList<>();
Calendar calendar = getCalendarWithoutTime(startDate);
Calendar endCalendar = getCalendarWithoutTime(endDate);
while (calendar.before(endCalendar)) {
Date result = calendar.getTime();
@ -40,4 +37,15 @@ public class TimeApi {
return startDate.datesUntil(endDate).collect(Collectors.toList());
}
private static Calendar getCalendarWithoutTime(Date date) {
Calendar calendar = new GregorianCalendar();
calendar.setTime(date);
calendar.set(Calendar.HOUR, 0);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
return calendar;
}
}

View File

@ -1,12 +1,13 @@
package com.baeldung.java9.time;
import org.junit.Test;
import java.time.LocalDate;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TimeApiUnitTest {
@ -18,19 +19,18 @@ public class TimeApiUnitTest {
Date endDate = endCalendar.getTime();
List<Date> dates = TimeApi.getDatesBetweenUsingJava7(startDate, endDate);
assertEquals(dates.size(), 2);
assertThat(dates).hasSize(2);
Calendar calendar = Calendar.getInstance();
Date date1 = calendar.getTime();
assertEquals(dates.get(0).getDay(), date1.getDay());
assertEquals(dates.get(0).getMonth(), date1.getMonth());
assertEquals(dates.get(0).getYear(), date1.getYear());
Date expectedDate1 = calendar.getTime();
assertThat(dates.get(0)).isInSameDayAs(expectedDate1);
assertThatTimeFieldsAreZero(dates.get(0));
calendar.add(Calendar.DATE, 1);
Date date2 = calendar.getTime();
assertEquals(dates.get(1).getDay(), date2.getDay());
assertEquals(dates.get(1).getMonth(), date2.getMonth());
assertEquals(dates.get(1).getYear(), date2.getYear());
Date expectedDate2 = calendar.getTime();
assertThat(dates.get(1)).isInSameDayAs(expectedDate2);
assertThatTimeFieldsAreZero(dates.get(1));
}
@Test
@ -39,9 +39,8 @@ public class TimeApiUnitTest {
LocalDate endDate = LocalDate.now().plusDays(2);
List<LocalDate> dates = TimeApi.getDatesBetweenUsingJava8(startDate, endDate);
assertEquals(dates.size(), 2);
assertEquals(dates.get(0), LocalDate.now());
assertEquals(dates.get(1), LocalDate.now().plusDays(1));
assertThat(dates).containsExactly(LocalDate.now(), LocalDate.now().plusDays(1));
}
@Test
@ -50,9 +49,15 @@ public class TimeApiUnitTest {
LocalDate endDate = LocalDate.now().plusDays(2);
List<LocalDate> dates = TimeApi.getDatesBetweenUsingJava9(startDate, endDate);
assertEquals(dates.size(), 2);
assertEquals(dates.get(0), LocalDate.now());
assertEquals(dates.get(1), LocalDate.now().plusDays(1));
assertThat(dates).containsExactly(LocalDate.now(), LocalDate.now().plusDays(1));
}
private static void assertThatTimeFieldsAreZero(Date date) {
assertThat(date).hasHourOfDay(0);
assertThat(date).hasMinute(0);
assertThat(date).hasSecond(0);
assertThat(date).hasMillisecond(0);
}
}

View File

@ -7,3 +7,4 @@ This module contains article about constructors in Java
- [Java Copy Constructor](https://www.baeldung.com/java-copy-constructor)
- [Cannot Reference “X” Before Supertype Constructor Has Been Called](https://www.baeldung.com/java-cannot-reference-x-before-supertype-constructor-error)
- [Private Constructors in Java](https://www.baeldung.com/java-private-constructors)
- [Throwing Exceptions in Constructors](https://www.baeldung.com/java-constructors-exceptions)

View File

@ -25,9 +25,9 @@ public class GrepWithUnix4JIntegrationTest {
@Test
public void whenGrepWithSimpleString_thenCorrect() {
int expectedLineCount = 4;
int expectedLineCount = 5;
// grep "NINETEEN" dictionary.txt
// grep "NINETEEN" dictionary.in
List<Line> lines = Unix4j.grep("NINETEEN", fileToGrep).toLineList();
assertEquals(expectedLineCount, lines.size());
@ -35,9 +35,9 @@ public class GrepWithUnix4JIntegrationTest {
@Test
public void whenInverseGrepWithSimpleString_thenCorrect() {
int expectedLineCount = 178687;
int expectedLineCount = 8;
// grep -v "NINETEEN" dictionary.txt
// grep -v "NINETEEN" dictionary.in
List<Line> lines = grep(Options.v, "NINETEEN", fileToGrep).toLineList();
assertEquals(expectedLineCount, lines.size());
@ -45,9 +45,9 @@ public class GrepWithUnix4JIntegrationTest {
@Test
public void whenGrepWithRegex_thenCorrect() {
int expectedLineCount = 151;
int expectedLineCount = 5;
// grep -c ".*?NINE.*?" dictionary.txt
// grep -c ".*?NINE.*?" dictionary.in
String patternCount = grep(Options.c, ".*?NINE.*?", fileToGrep).cut(fields, ":", 1).toStringResult();
assertEquals(expectedLineCount, Integer.parseInt(patternCount));

View File

@ -0,0 +1,13 @@
EIGHTTEEN
EIGHTTEENS
EIGHTTEENTH
EIGHTTEENTHS
NINETEEN
NINETEENS
NINETEENTH
NINETEENTHS
TWENTY
TWENTHIES
TWENTHIETH
TWENTHIETHS
TWENTYNINETEEN

View File

@ -4,3 +4,4 @@
- [Java (String) or .toString()?](https://www.baeldung.com/java-string-casting-vs-tostring)
- [Split Java String by Newline](https://www.baeldung.com/java-string-split-by-newline)
- [Split a String in Java and Keep the Delimiters](https://www.baeldung.com/java-split-string-keep-delimiters)
- [Validate String as Filename in Java](https://www.baeldung.com/java-validate-filename)

View File

@ -3,6 +3,11 @@ plugins {
id 'org.springframework.boot' version '2.3.4.RELEASE'
}
ext {
springBootVersion = '2.3.4.RELEASE'
lombokVersion = '1.18.14'
}
group = 'com.gradle'
version = '1.0.0'
sourceCompatibility = '14'
@ -12,19 +17,16 @@ repositories {
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter:2.3.4.RELEASE'
implementation "org.springframework.boot:spring-boot-starter:${springBootVersion}"
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.3.4.RELEASE'
compileOnly 'org.projectlombok:lombok:1.18.14'
testCompileOnly 'org.projectlombok:lombok:1.18.14'
compileOnly "org.projectlombok:lombok:${lombokVersion}"
runtimeOnly files('libs/sampleOne.jar', 'libs/sampleTwo.jar')
runtimeOnly fileTree('libs') { include '*.jar' }
runtimeOnly fileTree("libs") { include "*.jar" }
// implementation gradleApi()
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
}
test {

234
gradle/gradle-dependency-management/gradlew vendored Executable file
View File

@ -0,0 +1,234 @@
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
#
# Gradle start up script for POSIX generated by Gradle.
#
# Important for running:
#
# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
# noncompliant, but you have some other compliant shell such as ksh or
# bash, then to run this script, type that shell name before the whole
# command line, like:
#
# ksh Gradle
#
# Busybox and similar reduced shells will NOT work, because this script
# requires all of these POSIX shell features:
# * functions;
# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
# * compound commands having a testable exit status, especially «case»;
# * various built-in commands including «command», «set», and «ulimit».
#
# Important for patching:
#
# (2) This script targets any POSIX shell, so it avoids extensions provided
# by Bash, Ksh, etc; in particular arrays are avoided.
#
# The "traditional" practice of packing multiple parameters into a
# space-separated string is a well documented source of bugs and security
# problems, so this is (mostly) avoided, by progressively accumulating
# options in "$@", and eventually passing that to Java.
#
# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
# see the in-line comments for details.
#
# There are tweaks for specific operating systems such as AIX, CygWin,
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
#
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
app_path=$0
# Need this for daisy-chained symlinks.
while
APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
[ -h "$app_path" ]
do
ls=$( ls -ld "$app_path" )
link=${ls#*' -> '}
case $link in #(
/*) app_path=$link ;; #(
*) app_path=$APP_HOME$link ;;
esac
done
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
APP_NAME="Gradle"
APP_BASE_NAME=${0##*/}
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
warn () {
echo "$*"
} >&2
die () {
echo
echo "$*"
echo
exit 1
} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "$( uname )" in #(
CYGWIN* ) cygwin=true ;; #(
Darwin* ) darwin=true ;; #(
MSYS* | MINGW* ) msys=true ;; #(
NONSTOP* ) nonstop=true ;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD=$JAVA_HOME/jre/sh/java
else
JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
fi
# Collect all arguments for the java command, stacking in reverse order:
# * args from the command line
# * the main class name
# * -classpath
# * -D...appname settings
# * --module-path (only if needed)
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
if "$cygwin" || "$msys" ; then
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
JAVACMD=$( cygpath --unix "$JAVACMD" )
# Now convert the arguments - kludge to limit ourselves to /bin/sh
for arg do
if
case $arg in #(
-*) false ;; # don't mess with options #(
/?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
[ -e "$t" ] ;; #(
*) false ;;
esac
then
arg=$( cygpath --path --ignore --mixed "$arg" )
fi
# Roll the args list around exactly as many times as the number of
# args, so each arg winds up back in the position where it started, but
# possibly modified.
#
# NB: a `for` loop captures its iteration list before it begins, so
# changing the positional parameters here affects neither the number of
# iterations, nor the values presented in `arg`.
shift # remove old arg
set -- "$@" "$arg" # push replacement arg
done
fi
# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \
"$@"
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
#
# In Bash we could simply go:
#
# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
# set -- "${ARGS[@]}" "$@"
#
# but POSIX shell has neither arrays nor command substitution, so instead we
# post-process each arg (as a line of input to sed) to backslash-escape any
# character that might be a shell metacharacter, then use eval to reverse
# that process (while maintaining the separation between arguments), and wrap
# the whole thing up as a single "set" statement.
#
# This will of course break if any of these variables contains a newline or
# an unmatched quote.
#
eval "set -- $(
printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
xargs -n1 |
sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
tr '\n' ' '
)" '"$@"'
exec "$JAVACMD" "$@"

View File

@ -0,0 +1,89 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega

Binary file not shown.

Binary file not shown.

View File

@ -13,7 +13,6 @@ Remember, for advanced libraries like [Jackson](/jackson) and [JUnit](/testing-m
- [Implementing a FTP-Client in Java](https://www.baeldung.com/java-ftp-client)
- [Introduction to Functional Java](https://www.baeldung.com/java-functional-library)
- [A Guide to the Reflections Library](https://www.baeldung.com/reflections-library)
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
- [Introduction to Protonpack](https://www.baeldung.com/java-protonpack)
- [Java-R Integration](https://www.baeldung.com/java-r-integration)
- [Using libphonenumber to Validate Phone Numbers](https://www.baeldung.com/java-libphonenumber)

View File

@ -22,18 +22,6 @@
<artifactId>protonpack</artifactId>
<version>${protonpack.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
@ -148,7 +136,6 @@
</build>
<properties>
<kafka.version>2.0.0</kafka.version>
<javapoet.version>1.10.0</javapoet.version>
<reflections.version>0.9.11</reflections.version>
<mockftpserver.version>2.7.1</mockftpserver.version>

View File

@ -11,8 +11,6 @@ This module contains articles about libraries for data processing in Java.
- [An Introduction to SuanShu](https://www.baeldung.com/suanshu)
- [Intro to Derive4J](https://www.baeldung.com/derive4j)
- [Univocity Parsers](https://www.baeldung.com/java-univocity-parsers)
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
- More articles: [[<-- prev]](/../libraries-data)
##### Building the project

View File

@ -116,11 +116,6 @@
<artifactId>univocity-parsers</artifactId>
<version>${univocity.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -144,13 +139,6 @@
<version>${byte-buddy.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
@ -176,7 +164,6 @@
<slf4j.version>1.7.25</slf4j.version>
<awaitility.version>3.0.0</awaitility.version>
<univocity.version>2.8.4</univocity.version>
<kafka.version>2.5.0</kafka.version>
<guava.version>29.0-jre</guava.version>
</properties>

View File

@ -1,82 +0,0 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream =
environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -1,34 +0,0 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@ -1,11 +0,0 @@
## Data Libraries
This module contains articles about libraries for data processing in Java.
### Relevant articles
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
- More articles: [[<-- prev]](/../libraries-data-2)
##### Building the project
You can build the project from the command line using: *mvn clean install*, or in an IDE. If you have issues with the derive4j imports in your IDE, you have to add the folder: *target/generated-sources/annotations* to the project build path in your IDE.

View File

@ -1,64 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>libraries-data-3</artifactId>
<name>libraries-data-3</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<assertj.version>3.6.2</assertj.version>
<slf4j.version>1.7.25</slf4j.version>
<kafka.version>2.8.0</kafka.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
</properties>
</project>

View File

@ -3,14 +3,10 @@
This module contains articles about libraries for data processing in Java.
### Relevant articles
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
- [Introduction to JCache](https://www.baeldung.com/jcache)
- [A Guide to Apache Ignite](https://www.baeldung.com/apache-ignite)
- [Apache Ignite with Spring Data](https://www.baeldung.com/apache-ignite-spring-data)
- [A Guide to Apache Crunch](https://www.baeldung.com/apache-crunch)
- [Intro to Apache Storm](https://www.baeldung.com/apache-storm)
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
- [Guide to JMapper](https://www.baeldung.com/jmapper)
More articles: [[next -->]](/../libraries-data-2)

View File

@ -50,7 +50,7 @@ public class QueryXmlResourceWithConcurrentAxisIntegrationTest {
@Test
public void createDatabaseAndXMarkResourceAndCheckQuery() throws IOException {
final var pathToXmlFile = XML_DIRECTORY.resolve("10mb.xml");
final var pathToXmlFile = XML_DIRECTORY.resolve("regions.xml");
// Create an empty XML database.
Databases.createXmlDatabase(new DatabaseConfiguration(DATABASE_PATH));

File diff suppressed because it is too large Load Diff

View File

@ -4,3 +4,5 @@
### Relevant Articles:
- [Spring Data with ArangoDB](https://www.baeldung.com/spring-data-arangodb)

View File

@ -124,7 +124,7 @@ public class ZipsAggregationLiveTest {
}
@Test
public void whenStateWithLowestAvgCityPopIsND_theSuccess() {
public void whenStateWithLowestAvgCityPopIsME_theSuccess() {
GroupOperation sumTotalCityPop = group("state", "city").sum("pop").as("cityPop");
GroupOperation averageStatePop = group("_id.state").avg("cityPop").as("avgCityPop");
@ -138,13 +138,12 @@ public class ZipsAggregationLiveTest {
AggregationResults<StatePopulation> result = mongoTemplate.aggregate(aggregation, "zips", StatePopulation.class);
StatePopulation smallestState = result.getUniqueMappedResult();
assertEquals("ND", smallestState.getState());
assertTrue(smallestState.getStatePop()
.equals(1645));
assertEquals("ME", smallestState.getState());
assertEquals(3676, smallestState.getStatePop().longValue());
}
@Test
public void whenMaxTXAndMinDC_theSuccess() {
public void whenMaxMAAndMinRI_theSuccess() {
GroupOperation sumZips = group("state").count().as("zipCount");
SortOperation sortByCount = sort(Direction.ASC, "zipCount");
@ -157,10 +156,10 @@ public class ZipsAggregationLiveTest {
AggregationResults<Document> result = mongoTemplate.aggregate(aggregation, "zips", Document.class);
Document document = result.getUniqueMappedResult();
assertEquals("DC", document.get("minZipState"));
assertEquals(24, document.get("minZipCount"));
assertEquals("TX", document.get("maxZipState"));
assertEquals(1671, document.get("maxZipCount"));
assertEquals("RI", document.get("minZipState"));
assertEquals(69, document.get("minZipCount"));
assertEquals("MA", document.get("maxZipState"));
assertEquals(474, document.get("maxZipCount"));
}
}

File diff suppressed because it is too large Load Diff

View File

@ -345,6 +345,7 @@
<module>antlr</module>
<module>apache-cxf</module>
<module>apache-kafka</module>
<module>apache-libraries</module>
<module>apache-olingo/olingo2</module>
<module>apache-poi</module>
@ -815,6 +816,7 @@
<module>antlr</module>
<module>apache-cxf</module>
<module>apache-kafka</module>
<module>apache-libraries</module>
<module>apache-olingo/olingo2</module>
<module>apache-poi</module>

View File

@ -24,10 +24,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@ -54,11 +50,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>

View File

@ -1,25 +0,0 @@
package com.baeldung.reactive;
import com.mongodb.reactivestreams.client.MongoClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
@SpringBootApplication
public class Spring5ReactiveApplication{
public static void main(String[] args) {
SpringApplication.run(Spring5ReactiveApplication.class, args);
}
@Autowired
MongoClient mongoClient;
@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
return new ReactiveMongoTemplate(mongoClient, "test");
}
}

View File

@ -1,21 +0,0 @@
package com.baeldung.reactive.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Account {
@Id
private String id;
private String owner;
private Double value;
}

View File

@ -1,15 +0,0 @@
package com.baeldung.reactive.repository;
import com.baeldung.reactive.model.Account;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Repository
public interface AccountCrudRepository extends ReactiveCrudRepository<Account, String> {
public Flux<Account> findAllByValue(Double value);
public Mono<Account> findFirstByOwner(Mono<String> owner);
}

View File

@ -1,7 +0,0 @@
package com.baeldung.reactive.repository;
import com.baeldung.reactive.model.Account;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface AccountMongoRepository extends ReactiveMongoRepository<Account, String> {
}

View File

@ -1,15 +0,0 @@
package com.baeldung.reactive.repository;
import com.baeldung.reactive.model.Account;
import io.reactivex.Observable;
import io.reactivex.Single;
import org.springframework.data.repository.reactive.RxJava2CrudRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface AccountRxJavaRepository extends RxJava2CrudRepository<Account, String>{
public Observable<Account> findAllByValue(Double value);
public Single<Account> findFirstByOwner(Single<String> owner);
}

View File

@ -1,33 +0,0 @@
package com.baeldung.reactive.template;
import com.baeldung.reactive.model.Account;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.ReactiveRemoveOperation;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class AccountTemplateOperations {
@Autowired
ReactiveMongoTemplate template;
public Mono<Account> findById(String id) {
return template.findById(id, Account.class);
}
public Flux<Account> findAll() {
return template.findAll(Account.class);
}
public Mono<Account> save(Mono<Account> account) {
return template.save(account);
}
public ReactiveRemoveOperation.ReactiveRemove<Account> deleteAll() {
return template.remove(Account.class);
}
}

View File

@ -1,11 +0,0 @@
package com.baeldung.tailablecursor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class LogsCounterApplication {
public static void main(String[] args) {
SpringApplication.run(LogsCounterApplication.class, args);
}
}

View File

@ -1,21 +0,0 @@
package com.baeldung.tailablecursor.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@Document
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Log {
@Id
private String id;
private String service;
private LogLevel level;
private String message;
}

View File

@ -1,5 +0,0 @@
package com.baeldung.tailablecursor.domain;
public enum LogLevel {
ERROR, WARN, INFO
}

View File

@ -1,12 +0,0 @@
package com.baeldung.tailablecursor.repository;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import org.springframework.data.mongodb.repository.Tailable;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
@Tailable
Flux<Log> findByLevel(LogLevel level);
}

View File

@ -1,62 +0,0 @@
package com.baeldung.tailablecursor.service;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest;
import javax.annotation.PreDestroy;
import java.util.concurrent.atomic.AtomicInteger;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
@Slf4j
public class ErrorLogsCounter implements LogsCounter {
private static final String LEVEL_FIELD_NAME = "level";
private final String collectionName;
private final MessageListenerContainer container;
private final AtomicInteger counter = new AtomicInteger();
public ErrorLogsCounter(MongoTemplate mongoTemplate,
String collectionName) {
this.collectionName = collectionName;
this.container = new DefaultMessageListenerContainer(mongoTemplate);
container.start();
TailableCursorRequest<Log> request = getTailableCursorRequest();
container.register(request, Log.class);
}
@SuppressWarnings("unchecked")
private TailableCursorRequest<Log> getTailableCursorRequest() {
MessageListener<Document, Log> listener = message -> {
log.info("ERROR log received: {}", message.getBody());
counter.incrementAndGet();
};
return TailableCursorRequest.builder()
.collection(collectionName)
.filter(query(where(LEVEL_FIELD_NAME).is(LogLevel.ERROR)))
.publishTo(listener)
.build();
}
@Override
public int count() {
return counter.get();
}
@PreDestroy
public void close() {
container.stop();
}
}

View File

@ -1,36 +0,0 @@
package com.baeldung.tailablecursor.service;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import com.baeldung.tailablecursor.repository.LogsRepository;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import javax.annotation.PreDestroy;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class InfoLogsCounter implements LogsCounter {
private final AtomicInteger counter = new AtomicInteger();
private final Disposable subscription;
public InfoLogsCounter(LogsRepository repository) {
Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
this.subscription = stream.subscribe(logEntity -> {
log.info("INFO log received: " + logEntity);
counter.incrementAndGet();
});
}
@Override
public int count() {
return this.counter.get();
}
@PreDestroy
public void close() {
this.subscription.dispose();
}
}

View File

@ -1,5 +0,0 @@
package com.baeldung.tailablecursor.service;
public interface LogsCounter {
int count();
}

View File

@ -1,41 +0,0 @@
package com.baeldung.tailablecursor.service;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import javax.annotation.PreDestroy;
import java.util.concurrent.atomic.AtomicInteger;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
@Slf4j
public class WarnLogsCounter implements LogsCounter {
private static final String LEVEL_FIELD_NAME = "level";
private final AtomicInteger counter = new AtomicInteger();
private final Disposable subscription;
public WarnLogsCounter(ReactiveMongoOperations template) {
Flux<Log> stream = template.tail(query(where(LEVEL_FIELD_NAME).is(LogLevel.WARN)), Log.class);
subscription = stream.subscribe(logEntity -> {
log.warn("WARN log received: " + logEntity);
counter.incrementAndGet();
});
}
@Override
public int count() {
return counter.get();
}
@PreDestroy
public void close() {
subscription.dispose();
}
}

View File

@ -1,17 +0,0 @@
package com.baeldung;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.baeldung.reactive.Spring5ReactiveApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Spring5ReactiveApplication.class)
public class SpringContextTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}

View File

@ -1,70 +0,0 @@
package com.baeldung.reactive.repository;
import com.baeldung.reactive.Spring5ReactiveApplication;
import com.baeldung.reactive.model.Account;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
public class AccountCrudRepositoryManualTest {
@Autowired
AccountCrudRepository repository;
@Test
public void givenValue_whenFindAllByValue_thenFindAccount() {
repository.save(new Account(null, "Bill", 12.3)).block();
Flux<Account> accountFlux = repository.findAllByValue(12.3);
StepVerifier.create(accountFlux)
.assertNext(account -> {
assertEquals("Bill", account.getOwner());
assertEquals(Double.valueOf(12.3) , account.getValue());
assertNotNull(account.getId());
})
.expectComplete()
.verify();
}
@Test
public void givenOwner_whenFindFirstByOwner_thenFindAccount() {
repository.save(new Account(null, "Bill", 12.3)).block();
Mono<Account> accountMono = repository.findFirstByOwner(Mono.just("Bill"));
StepVerifier.create(accountMono)
.assertNext(account -> {
assertEquals("Bill", account.getOwner());
assertEquals(Double.valueOf(12.3) , account.getValue());
assertNotNull(account.getId());
})
.expectComplete()
.verify();
}
@Test
public void givenAccount_whenSave_thenSaveAccount() {
Mono<Account> accountMono = repository.save(new Account(null, "Bill", 12.3));
StepVerifier
.create(accountMono)
.assertNext(account -> assertNotNull(account.getId()))
.expectComplete()
.verify();
}
}

View File

@ -1,67 +0,0 @@
package com.baeldung.reactive.repository;
import com.baeldung.reactive.Spring5ReactiveApplication;
import com.baeldung.reactive.model.Account;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.ExampleMatcher;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.startsWith;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
public class AccountMongoRepositoryManualTest {
@Autowired
AccountMongoRepository repository;
@Test
public void givenExample_whenFindAllWithExample_thenFindAllMacthings() {
repository.save(new Account(null, "john", 12.3)).block();
ExampleMatcher matcher = ExampleMatcher.matching().withMatcher("owner", startsWith());
Example<Account> example = Example.of(new Account(null, "jo", null), matcher);
Flux<Account> accountFlux = repository.findAll(example);
StepVerifier
.create(accountFlux)
.assertNext(account -> assertEquals("john", account.getOwner()))
.expectComplete()
.verify();
}
@Test
public void givenAccount_whenSave_thenSave() {
Mono<Account> accountMono = repository.save(new Account(null, "john", 12.3));
StepVerifier
.create(accountMono)
.assertNext(account -> assertNotNull(account.getId()))
.expectComplete()
.verify();
}
@Test
public void givenId_whenFindById_thenFindAccount() {
Account inserted = repository.save(new Account(null, "john", 12.3)).block();
Mono<Account> accountMono = repository.findById(inserted.getId());
StepVerifier
.create(accountMono)
.assertNext(account -> {
assertEquals("john", account.getOwner());
assertEquals(Double.valueOf(12.3), account.getValue());
assertNotNull(account.getId());
})
.expectComplete()
.verify();
}
}

View File

@ -1,58 +0,0 @@
package com.baeldung.reactive.repository;
import com.baeldung.reactive.Spring5ReactiveApplication;
import com.baeldung.reactive.model.Account;
import io.reactivex.Observable;
import io.reactivex.Single;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
public class AccountRxJavaRepositoryManualTest {
@Autowired
AccountRxJavaRepository repository;
@Test
public void givenValue_whenFindAllByValue_thenFindAccounts() throws InterruptedException {
repository.save(new Account(null, "bruno", 12.3)).blockingGet();
Observable<Account> accountObservable = repository.findAllByValue(12.3);
accountObservable
.test()
.await()
.assertComplete()
.assertValueAt(0, account -> {
assertEquals("bruno", account.getOwner());
assertEquals(Double.valueOf(12.3), account.getValue());
return true;
});
}
@Test
public void givenOwner_whenFindFirstByOwner_thenFindAccount() throws InterruptedException {
repository.save(new Account(null, "bruno", 12.3)).blockingGet();
Single<Account> accountSingle = repository.findFirstByOwner(Single.just("bruno"));
accountSingle
.test()
.await()
.assertComplete()
.assertValueAt(0, account -> {
assertEquals("bruno", account.getOwner());
assertEquals(Double.valueOf(12.3), account.getValue());
assertNotNull(account.getId());
return true;
});
}
}

View File

@ -1,48 +0,0 @@
package com.baeldung.reactive.template;
import com.baeldung.reactive.Spring5ReactiveApplication;
import com.baeldung.reactive.model.Account;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
public class AccountTemplateOperationsManualTest {
@Autowired
AccountTemplateOperations accountTemplate;
@Test
public void givenAccount_whenSave_thenSave() {
Account account = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3))).block();
assertNotNull( account.getId() );
}
@Test
public void givenId_whenFindById_thenFindAccount() {
Mono<Account> accountMono = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3)));
Mono<Account> accountMonoResult = accountTemplate.findById(accountMono.block().getId());
assertNotNull(accountMonoResult.block().getId());
assertEquals(accountMonoResult.block().getOwner(), "Raul");
}
@Test
public void whenFindAll_thenFindAllAccounts() {
Account account1 = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3))).block();
Account account2 = accountTemplate.save(Mono.just(new Account(null, "Raul Torres", 13.3))).block();
Flux<Account> accountFlux = accountTemplate.findAll();
List<Account> accounts = accountFlux.collectList().block();
assertTrue(accounts.stream().anyMatch(x -> account1.getId().equals(x.getId()) ));
assertTrue(accounts.stream().anyMatch(x -> account2.getId().equals(x.getId()) ));
}
}

View File

@ -1,112 +0,0 @@
package com.baeldung.tailablecursor.service;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import org.bson.Document;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.util.SocketUtils;
import java.io.IOException;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class ErrorLogsCounterManualTest {
private static final String SERVER = "localhost";
private static final int PORT = SocketUtils.findAvailableTcpPort(10000);
private static final String DB_NAME = "test";
private static final String COLLECTION_NAME = Log.class.getName().toLowerCase();
private static final MongodStarter starter = MongodStarter.getDefaultInstance();
private static final int MAX_DOCUMENTS_IN_COLLECTION = 3;
private ErrorLogsCounter errorLogsCounter;
private MongodExecutable mongodExecutable;
private MongodProcess mongoDaemon;
private MongoDatabase db;
@Before
public void setup() throws Exception {
MongoTemplate mongoTemplate = initMongoTemplate();
MongoCollection<Document> collection = createCappedCollection();
persistDocument(collection, -1, LogLevel.ERROR, "my-service", "Initial log");
errorLogsCounter = new ErrorLogsCounter(mongoTemplate, COLLECTION_NAME);
Thread.sleep(1000L); // wait for initialization
}
private MongoTemplate initMongoTemplate() throws IOException {
mongodExecutable = starter.prepare(new MongodConfigBuilder()
.version(Version.Main.PRODUCTION)
.net(new Net(SERVER, PORT, Network.localhostIsIPv6()))
.build());
mongoDaemon = mongodExecutable.start();
MongoClient mongoClient = new MongoClient(SERVER, PORT);
db = mongoClient.getDatabase(DB_NAME);
return new MongoTemplate(mongoClient, DB_NAME);
}
private MongoCollection<Document> createCappedCollection() {
db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
.capped(true)
.sizeInBytes(100000)
.maxDocuments(MAX_DOCUMENTS_IN_COLLECTION));
return db.getCollection(COLLECTION_NAME);
}
private void persistDocument(MongoCollection<Document> collection,
int i, LogLevel level, String service, String message) {
Document logMessage = new Document();
logMessage.append("_id", i);
logMessage.append("level", level.toString());
logMessage.append("service", service);
logMessage.append("message", message);
collection.insertOne(logMessage);
}
@After
public void tearDown() {
errorLogsCounter.close();
mongoDaemon.stop();
mongodExecutable.stop();
}
@Test
public void whenErrorLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
MongoCollection<Document> collection = db.getCollection(COLLECTION_NAME);
IntStream.range(1, 10)
.forEach(i -> persistDocument(collection,
i,
i > 5 ? LogLevel.ERROR : LogLevel.INFO,
"service" + i,
"Message from service " + i)
);
Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
assertThat(collection.countDocuments(), is((long) MAX_DOCUMENTS_IN_COLLECTION));
assertThat(errorLogsCounter.count(), is(5));
}
}

View File

@ -1,75 +0,0 @@
package com.baeldung.tailablecursor.service;
import com.baeldung.tailablecursor.LogsCounterApplication;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import com.baeldung.tailablecursor.repository.LogsRepository;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = LogsCounterApplication.class)
@Slf4j
public class InfoLogsCounterManualTest {
@Autowired
private LogsRepository repository;
@Autowired
private ReactiveMongoTemplate template;
@Before
public void setUp() {
createCappedCollectionUsingReactiveMongoTemplate(template);
persistDocument(Log.builder()
.level(LogLevel.INFO)
.service("Service 2")
.message("Initial INFO message")
.build());
}
private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
reactiveMongoTemplate.dropCollection(Log.class).block();
reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
.maxDocuments(5)
.size(1024 * 1024L)
.capped()).block();
}
private void persistDocument(Log log) {
repository.save(log).block();
}
@Test
public void wheInfoLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
InfoLogsCounter infoLogsCounter = new InfoLogsCounter(repository);
Thread.sleep(1000L); // wait for initialization
Flux.range(0,10)
.map(i -> Log.builder()
.level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
.service("some-service")
.message("some log message")
.build())
.map(entity -> repository.save(entity).subscribe())
.blockLast();
Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
assertThat(infoLogsCounter.count(), is(7));
infoLogsCounter.close();
}
}

View File

@ -1,75 +0,0 @@
package com.baeldung.tailablecursor.service;
import com.baeldung.tailablecursor.LogsCounterApplication;
import com.baeldung.tailablecursor.domain.Log;
import com.baeldung.tailablecursor.domain.LogLevel;
import com.baeldung.tailablecursor.repository.LogsRepository;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = LogsCounterApplication.class)
@Slf4j
public class WarnLogsCounterManualTest {
@Autowired
private LogsRepository repository;
@Autowired
private ReactiveMongoTemplate template;
@Before
public void setUp() {
createCappedCollectionUsingReactiveMongoTemplate(template);
persistDocument(Log.builder()
.level(LogLevel.WARN)
.service("Service 1")
.message("Initial Warn message")
.build());
}
private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
reactiveMongoTemplate.dropCollection(Log.class).block();
reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
.maxDocuments(5)
.size(1024 * 1024L)
.capped()).block();
}
private void persistDocument(Log log) {
repository.save(log).block();
}
@Test
public void whenWarnLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
WarnLogsCounter warnLogsCounter = new WarnLogsCounter(template);
Thread.sleep(1000L); // wait for initialization
Flux.range(0,10)
.map(i -> Log.builder()
.level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
.service("some-service")
.message("some log message")
.build())
.map(entity -> repository.save(entity).subscribe())
.blockLast();
Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
assertThat(warnLogsCounter.count(), is(5));
warnLogsCounter.close();
}
}

View File

@ -14,7 +14,7 @@ import org.springframework.http.ResponseEntity;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RibbonClientApp.class)
public class RibbonRetryFailureIntegrationTest {
public class RibbonRetryFailureManualTest {
private static ConfigurableApplicationContext weatherServiceInstance1;
private static ConfigurableApplicationContext weatherServiceInstance2;

View File

@ -15,7 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RibbonClientApp.class)
public class RibbonRetrySuccessIntegrationTest {
public class RibbonRetrySuccessManualTest {
private static ConfigurableApplicationContext weatherServiceInstance1;
private static ConfigurableApplicationContext weatherServiceInstance2;

View File

@ -38,7 +38,7 @@
<module>spring-security-web-mvc-custom</module>
<module>spring-security-web-mvc</module>
<module>spring-security-web-persistent-login</module>
<!-- <module>spring-security-web-react</module> --> <!-- Fixing in JAVA-6216 -->
<module>spring-security-web-react</module>
<module>spring-security-web-rest-basic-auth</module>
<module>spring-security-web-rest-custom</module>
<module>spring-security-web-rest</module>
@ -49,4 +49,4 @@
<module>spring-social-login</module>
</modules>
</project>
</project>

View File

@ -1,36 +0,0 @@
package com.baeldung.xss;
public class Person {
private String firstName;
private String lastName;
private int age;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Person {" + "firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + ", age=" + age + '}';
}
}

View File

@ -1,31 +0,0 @@
package com.baeldung.xss;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.Map;
@RestController
@RequestMapping("/personService")
public class PersonController {
@PostMapping(value = "/person")
private ResponseEntity<String> savePerson(@RequestHeader Map<String, String> headers,
@RequestParam String param, @RequestBody Person body) {
ObjectNode response = JsonNodeFactory.instance.objectNode();
headers.forEach((key, value) -> response.put(key, value));
response.put("firstName", body.getFirstName());
response.put("lastName", body.getLastName());
response.put("age", body.getAge());
response.put("param", param);
return new ResponseEntity<String>(response.toString(), HttpStatus.OK);
}
}

Some files were not shown because too many files have changed in this diff Show More