Merge pull request #1 from eugenp/master

Merge
This commit is contained in:
Krzysztof Majewski 2021-08-20 12:17:38 +02:00 committed by GitHub
commit 93c45fc02e
153 changed files with 12940 additions and 2556 deletions

18
apache-kafka/README.md Normal file
View File

@ -0,0 +1,18 @@
## Apache Kafka
This module contains articles about Apache Kafka.
### Relevant articles
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
##### Building the project
You can build the project from the command line using: *mvn clean install*, or in an IDE.

180
apache-kafka/pom.xml Normal file
View File

@ -0,0 +1,180 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>apache-kafka</artifactId>
<name>apache-kafka</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility-proxy</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${com.datastax.spark.spark-cassandra-connector.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
</dependency>
</dependencies>
<properties>
<assertj.version>3.6.2</assertj.version>
<kafka.version>2.8.0</kafka.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
<flink.version>1.5.0</flink.version>
<awaitility.version>3.0.0</awaitility.version>
<guava.version>29.0-jre</guava.version>
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
</properties>
</project>

View File

@ -0,0 +1,70 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
stringInputStream.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
inputMessagesStream.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -9,23 +9,20 @@ import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
return consumer;
}
return consumer;
}
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
topic, new InputMessageDeserializationSchema(),properties);
properties.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
return consumer;
}

View File

@ -18,6 +18,7 @@ public class InputMessage {
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
@ -55,8 +56,10 @@ public class InputMessage {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
InputMessage message1 = (InputMessage) o;
return Objects.equal(sender, message1.sender) &&
Objects.equal(recipient, message1.recipient) &&

View File

@ -0,0 +1,34 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
@Override
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
return element.getSentAt()
.atZone(zoneId)
.toEpochSecond() * 1000;
}
@Nullable

View File

@ -9,8 +9,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BackupSerializationSchema
implements SerializationSchema<Backup> {
public class BackupSerializationSchema implements SerializationSchema<Backup> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@ -18,7 +17,7 @@ public class BackupSerializationSchema
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
if (objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
}

View File

@ -8,12 +8,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
public class InputMessageDeserializationSchema implements DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {

View File

@ -27,11 +27,11 @@ public class KafkaTopicApplication {
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic));
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
// get the async result for the new topic creation
KafkaFuture<Void> future = result.values().get(topicName);
KafkaFuture<Void> future = result.values()
.get(topicName);
// call get() to block until topic creation has completed or failed
future.get();
@ -47,15 +47,13 @@ public class KafkaTopicApplication {
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
.validateOnly(true)
.retryOnQuotaViolation(true);
CreateTopicsOptions topicOptions = new CreateTopicsOptions().validateOnly(true)
.retryOnQuotaViolation(true);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic), topicOptions
);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic), topicOptions);
KafkaFuture<Void> future = result.values().get(topicName);
KafkaFuture<Void> future = result.values()
.get(topicName);
future.get();
}
}
@ -72,14 +70,12 @@ public class KafkaTopicApplication {
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(newTopicConfig);
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor).configs(newTopicConfig);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic)
);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
KafkaFuture<Void> future = result.values().get(topicName);
KafkaFuture<Void> future = result.values()
.get(topicName);
future.get();
}
}

View File

@ -19,9 +19,7 @@ public class CountryPopulationConsumer {
private java.util.function.Consumer<Throwable> exceptionConsumer;
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
public CountryPopulationConsumer(
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
public CountryPopulationConsumer(Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer, java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
this.consumer = consumer;
this.exceptionConsumer = exceptionConsumer;
this.countryPopulationConsumer = countryPopulationConsumer;

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka;
package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -24,16 +24,16 @@ public class TransactionalMessageProducer {
producer.initTransactions();
try{
try {
producer.beginTransaction();
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
new ProducerRecord<String, String>("input", null, s)));
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2)
.forEach(s -> producer.send(new ProducerRecord<String, String>("input", null, s)));
producer.commitTransaction();
}catch (KafkaException e){
} catch (KafkaException e) {
producer.abortTransaction();

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka;
package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -43,10 +43,11 @@ public class TransactionalWordCount {
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
.stream()
.flatMap(record -> Stream.of(record.value().split(" ")))
.map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
.stream()
.flatMap(record -> Stream.of(record.value()
.split(" ")))
.map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
producer.beginTransaction();
@ -56,7 +57,8 @@ public class TransactionalWordCount {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
long offset = partitionedRecords.get(partitionedRecords.size() - 1)
.offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}
@ -72,7 +74,6 @@ public class TransactionalWordCount {
}
}
private static KafkaConsumer<String, String> createKafkaConsumer() {

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka;
package com.baeldung.kafka.exactlyonce;
public class Tuple {
@ -10,8 +10,8 @@ public class Tuple {
this.value = value;
}
public static Tuple of(String key, Integer value){
return new Tuple(key,value);
public static Tuple of(String key, Integer value) {
return new Tuple(key, value);
}
public String getKey() {

View File

@ -15,8 +15,7 @@ public class KafkaProducer {
}
public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news",
key, value);
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
return producer.send(record);
}
@ -36,5 +35,4 @@ public class KafkaProducer {
producer.commitTransaction();
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung.kafka.streams;
package com.baeldung.kafka.streamsvsconsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;

View File

@ -1,61 +1,78 @@
package com.baeldung.kafkastreams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.Ignore;
import org.junit.Test;
public class KafkaStreamsLiveTest {
private String bootstrapServers = "localhost:9092";
private Path stateDirectory;
@Test
@Ignore("it needs to have kafka broker running on local")
public void shouldTestKafkaStreams() throws InterruptedException {
//given
// given
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()
.getClass()
.getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String()
.getClass()
.getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
//when
KStreamBuilder builder = new KStreamBuilder();
// Use a temporary directory for storing state, which will be automatically removed after the test.
try {
this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath()
.toString());
} catch (final IOException e) {
throw new UncheckedIOException("Cannot create temporary directory", e);
}
// when
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
String outputTopic = "outputTopic";
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();
//then
// then
Thread.sleep(30000);
streams.close();
}

View File

@ -1,48 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>core-java-16</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>core-java-16</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>core-java-16</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>core-java-16</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${maven.compiler.source.version}</source>
<target>${maven.compiler.target.version}</target>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${maven.compiler.source.version}</source>
<target>${maven.compiler.target.version}</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source.version>16</maven.compiler.source.version>
<maven.compiler.target.version>16</maven.compiler.target.version>
<assertj.version>3.6.1</assertj.version>
</properties>
<properties>
<maven.compiler.source.version>16</maven.compiler.source.version>
<maven.compiler.target.version>16</maven.compiler.target.version>
<assertj.version>3.6.1</assertj.version>
</properties>
</project>

View File

@ -0,0 +1,38 @@
package com.baeldung.java_16_features.groupingby;
public class BlogPost {
private String title;
private String author;
private BlogPostType type;
private int likes;
record AuthPostTypesLikes(String author, BlogPostType type, int likes) {};
public BlogPost(String title, String author, BlogPostType type, int likes) {
this.title = title;
this.author = author;
this.type = type;
this.likes = likes;
}
public String getTitle() {
return title;
}
public String getAuthor() {
return author;
}
public BlogPostType getType() {
return type;
}
public int getLikes() {
return likes;
}
@Override
public String toString() {
return "BlogPost{" + "title='" + title + '\'' + ", type=" + type + ", likes=" + likes + '}';
}
}

View File

@ -0,0 +1,5 @@
package com.baeldung.java_16_features.groupingby;
public enum BlogPostType {
NEWS, REVIEW, GUIDE
}

View File

@ -0,0 +1,41 @@
package com.baeldung.java_16_features.groupingby;
import java.util.Objects;
public class Tuple {
private final BlogPostType type;
private final String author;
public Tuple(BlogPostType type, String author) {
this.type = type;
this.author = author;
}
public BlogPostType getType() {
return type;
}
public String getAuthor() {
return author;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tuple tuple = (Tuple) o;
return type == tuple.type && author.equals(tuple.author);
}
@Override
public int hashCode() {
return Objects.hash(type, author);
}
@Override
public String toString() {
return "Tuple{" + "type=" + type + ", author='" + author + '\'' + '}';
}
}

View File

@ -0,0 +1,254 @@
package com.baeldung.java_16_features.groupingby;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.averagingInt;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.groupingByConcurrent;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.maxBy;
import static java.util.stream.Collectors.summarizingInt;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
public class JavaGroupingByCollectorUnitTest {
private static final List<BlogPost> posts = Arrays.asList(new BlogPost("News item 1", "Author 1", BlogPostType.NEWS, 15), new BlogPost("Tech review 1", "Author 2", BlogPostType.REVIEW, 5),
new BlogPost("Programming guide", "Author 1", BlogPostType.GUIDE, 20), new BlogPost("News item 2", "Author 2", BlogPostType.NEWS, 35), new BlogPost("Tech review 2", "Author 1", BlogPostType.REVIEW, 15));
@Test
public void givenAListOfPosts_whenGroupedByType_thenGetAMapBetweenTypeAndPosts() {
Map<BlogPostType, List<BlogPost>> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndTheirTitlesAreJoinedInAString_thenGetAMapBetweenTypeAndCsvTitles() {
Map<BlogPostType, String> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, mapping(BlogPost::getTitle, joining(", ", "Post titles: [", "]"))));
assertEquals("Post titles: [News item 1, News item 2]", postsPerType.get(BlogPostType.NEWS));
assertEquals("Post titles: [Programming guide]", postsPerType.get(BlogPostType.GUIDE));
assertEquals("Post titles: [Tech review 1, Tech review 2]", postsPerType.get(BlogPostType.REVIEW));
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndSumTheLikes_thenGetAMapBetweenTypeAndPostLikes() {
Map<BlogPostType, Integer> likesPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, summingInt(BlogPost::getLikes)));
assertEquals(50, likesPerType.get(BlogPostType.NEWS)
.intValue());
assertEquals(20, likesPerType.get(BlogPostType.REVIEW)
.intValue());
assertEquals(20, likesPerType.get(BlogPostType.GUIDE)
.intValue());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeInAnEnumMap_thenGetAnEnumMapBetweenTypeAndPosts() {
EnumMap<BlogPostType, List<BlogPost>> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, () -> new EnumMap<>(BlogPostType.class), toList()));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeInSets_thenGetAMapBetweenTypesAndSetsOfPosts() {
Map<BlogPostType, Set<BlogPost>> postsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, toSet()));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeConcurrently_thenGetAMapBetweenTypeAndPosts() {
ConcurrentMap<BlogPostType, List<BlogPost>> postsPerType = posts.parallelStream()
.collect(groupingByConcurrent(BlogPost::getType));
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
.size());
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
.size());
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
.size());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndAveragingLikes_thenGetAMapBetweenTypeAndAverageNumberOfLikes() {
Map<BlogPostType, Double> averageLikesPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, averagingInt(BlogPost::getLikes)));
assertEquals(25, averageLikesPerType.get(BlogPostType.NEWS)
.intValue());
assertEquals(20, averageLikesPerType.get(BlogPostType.GUIDE)
.intValue());
assertEquals(10, averageLikesPerType.get(BlogPostType.REVIEW)
.intValue());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndCounted_thenGetAMapBetweenTypeAndNumberOfPosts() {
Map<BlogPostType, Long> numberOfPostsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, counting()));
assertEquals(2, numberOfPostsPerType.get(BlogPostType.NEWS)
.intValue());
assertEquals(1, numberOfPostsPerType.get(BlogPostType.GUIDE)
.intValue());
assertEquals(2, numberOfPostsPerType.get(BlogPostType.REVIEW)
.intValue());
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndMaxingLikes_thenGetAMapBetweenTypeAndMaximumNumberOfLikes() {
Map<BlogPostType, Optional<BlogPost>> maxLikesPerPostType = posts.stream()
.collect(groupingBy(BlogPost::getType, maxBy(comparingInt(BlogPost::getLikes))));
assertTrue(maxLikesPerPostType.get(BlogPostType.NEWS)
.isPresent());
assertEquals(35, maxLikesPerPostType.get(BlogPostType.NEWS)
.get()
.getLikes());
assertTrue(maxLikesPerPostType.get(BlogPostType.GUIDE)
.isPresent());
assertEquals(20, maxLikesPerPostType.get(BlogPostType.GUIDE)
.get()
.getLikes());
assertTrue(maxLikesPerPostType.get(BlogPostType.REVIEW)
.isPresent());
assertEquals(15, maxLikesPerPostType.get(BlogPostType.REVIEW)
.get()
.getLikes());
}
@Test
public void givenAListOfPosts_whenGroupedByAuthorAndThenByType_thenGetAMapBetweenAuthorAndMapsBetweenTypeAndBlogPosts() {
Map<String, Map<BlogPostType, List<BlogPost>>> map = posts.stream()
.collect(groupingBy(BlogPost::getAuthor, groupingBy(BlogPost::getType)));
assertEquals(1, map.get("Author 1")
.get(BlogPostType.NEWS)
.size());
assertEquals(1, map.get("Author 1")
.get(BlogPostType.GUIDE)
.size());
assertEquals(1, map.get("Author 1")
.get(BlogPostType.REVIEW)
.size());
assertEquals(1, map.get("Author 2")
.get(BlogPostType.NEWS)
.size());
assertEquals(1, map.get("Author 2")
.get(BlogPostType.REVIEW)
.size());
assertNull(map.get("Author 2")
.get(BlogPostType.GUIDE));
}
@Test
public void givenAListOfPosts_whenGroupedByTypeAndSummarizingLikes_thenGetAMapBetweenTypeAndSummary() {
Map<BlogPostType, IntSummaryStatistics> likeStatisticsPerType = posts.stream()
.collect(groupingBy(BlogPost::getType, summarizingInt(BlogPost::getLikes)));
IntSummaryStatistics newsLikeStatistics = likeStatisticsPerType.get(BlogPostType.NEWS);
assertEquals(2, newsLikeStatistics.getCount());
assertEquals(50, newsLikeStatistics.getSum());
assertEquals(25.0, newsLikeStatistics.getAverage(), 0.001);
assertEquals(35, newsLikeStatistics.getMax());
assertEquals(15, newsLikeStatistics.getMin());
}
@Test
public void givenAListOfPosts_whenGroupedByComplexMapPairKeyType_thenGetAMapBetweenPairAndList() {
Map<Pair<BlogPostType, String>, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
.collect(groupingBy(post -> new ImmutablePair<>(post.getType(), post.getAuthor())));
List<BlogPost> result = postsPerTypeAndAuthor.get(new ImmutablePair<>(BlogPostType.GUIDE, "Author 1"));
assertThat(result.size()).isEqualTo(1);
BlogPost blogPost = result.get(0);
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
}
@Test
public void givenAListOfPosts_whenGroupedByComplexMapKeyType_thenGetAMapBetweenTupleAndList() {
Map<Tuple, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
.collect(groupingBy(post -> new Tuple(post.getType(), post.getAuthor())));
List<BlogPost> result = postsPerTypeAndAuthor.get(new Tuple(BlogPostType.GUIDE, "Author 1"));
assertThat(result.size()).isEqualTo(1);
BlogPost blogPost = result.get(0);
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
}
@Test
public void givenAListOfPosts_whenGroupedByRecord_thenGetAMapBetweenRecordAndList() {
Map<BlogPost.AuthPostTypesLikes, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
.collect(groupingBy(post -> new BlogPost.AuthPostTypesLikes(post.getAuthor(), post.getType(), post.getLikes())));
List<BlogPost> result = postsPerTypeAndAuthor.get(new BlogPost.AuthPostTypesLikes("Author 1", BlogPostType.GUIDE, 20));
assertThat(result.size()).isEqualTo(1);
BlogPost blogPost = result.get(0);
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
assertThat(blogPost.getLikes()).isEqualTo(20);
}
}

View File

@ -189,21 +189,8 @@ public class StopExecution {
longRunningSort();
}
private void longRunningOperation() {
LOG.info("long Running operation started");
try {
//Thread.sleep(500);
longFileRead();
LOG.info("long running operation finished");
} catch (InterruptedException e) {
LOG.info("long Running operation interrupted");
}
}
private void longRunningSort() {
LOG.info("long Running task started");
// Do you long running calculation here
LOG.info("Long running task started");
int len = 100000;
List<Integer> numbers = new ArrayList<>();
try {
@ -229,25 +216,7 @@ public class StopExecution {
LOG.info("Index position: " + i);
LOG.info("Long running task finished");
} catch (InterruptedException e) {
LOG.info("long Running operation interrupted");
}
}
private void longFileRead() throws InterruptedException {
String file = "input.txt";
ClassLoader classloader = getClass().getClassLoader();
try (InputStream inputStream = classloader.getResourceAsStream(file)) {
Reader inputStreamReader = new InputStreamReader(inputStream);
int data = inputStreamReader.read();
while (data != -1) {
char theChar = (char) data;
data = inputStreamReader.read();
throwExceptionOnThreadInterrupt();
}
} catch (IOException e) {
LOG.error("Exception: ", e);
LOG.info("Long running operation interrupted");
}
}

View File

@ -13,12 +13,9 @@ import java.util.stream.IntStream;
public class TimeApi {
public static List<Date> getDatesBetweenUsingJava7(Date startDate, Date endDate) {
List<Date> datesInRange = new ArrayList<Date>();
Calendar calendar = new GregorianCalendar();
calendar.setTime(startDate);
Calendar endCalendar = new GregorianCalendar();
endCalendar.setTime(endDate);
List<Date> datesInRange = new ArrayList<>();
Calendar calendar = getCalendarWithoutTime(startDate);
Calendar endCalendar = getCalendarWithoutTime(endDate);
while (calendar.before(endCalendar)) {
Date result = calendar.getTime();
@ -40,4 +37,15 @@ public class TimeApi {
return startDate.datesUntil(endDate).collect(Collectors.toList());
}
private static Calendar getCalendarWithoutTime(Date date) {
Calendar calendar = new GregorianCalendar();
calendar.setTime(date);
calendar.set(Calendar.HOUR, 0);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
return calendar;
}
}

View File

@ -1,12 +1,13 @@
package com.baeldung.java9.time;
import org.junit.Test;
import java.time.LocalDate;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TimeApiUnitTest {
@ -18,19 +19,18 @@ public class TimeApiUnitTest {
Date endDate = endCalendar.getTime();
List<Date> dates = TimeApi.getDatesBetweenUsingJava7(startDate, endDate);
assertEquals(dates.size(), 2);
assertThat(dates).hasSize(2);
Calendar calendar = Calendar.getInstance();
Date date1 = calendar.getTime();
assertEquals(dates.get(0).getDay(), date1.getDay());
assertEquals(dates.get(0).getMonth(), date1.getMonth());
assertEquals(dates.get(0).getYear(), date1.getYear());
Date expectedDate1 = calendar.getTime();
assertThat(dates.get(0)).isInSameDayAs(expectedDate1);
assertThatTimeFieldsAreZero(dates.get(0));
calendar.add(Calendar.DATE, 1);
Date date2 = calendar.getTime();
assertEquals(dates.get(1).getDay(), date2.getDay());
assertEquals(dates.get(1).getMonth(), date2.getMonth());
assertEquals(dates.get(1).getYear(), date2.getYear());
Date expectedDate2 = calendar.getTime();
assertThat(dates.get(1)).isInSameDayAs(expectedDate2);
assertThatTimeFieldsAreZero(dates.get(1));
}
@Test
@ -39,9 +39,8 @@ public class TimeApiUnitTest {
LocalDate endDate = LocalDate.now().plusDays(2);
List<LocalDate> dates = TimeApi.getDatesBetweenUsingJava8(startDate, endDate);
assertEquals(dates.size(), 2);
assertEquals(dates.get(0), LocalDate.now());
assertEquals(dates.get(1), LocalDate.now().plusDays(1));
assertThat(dates).containsExactly(LocalDate.now(), LocalDate.now().plusDays(1));
}
@Test
@ -50,9 +49,15 @@ public class TimeApiUnitTest {
LocalDate endDate = LocalDate.now().plusDays(2);
List<LocalDate> dates = TimeApi.getDatesBetweenUsingJava9(startDate, endDate);
assertEquals(dates.size(), 2);
assertEquals(dates.get(0), LocalDate.now());
assertEquals(dates.get(1), LocalDate.now().plusDays(1));
assertThat(dates).containsExactly(LocalDate.now(), LocalDate.now().plusDays(1));
}
private static void assertThatTimeFieldsAreZero(Date date) {
assertThat(date).hasHourOfDay(0);
assertThat(date).hasMinute(0);
assertThat(date).hasSecond(0);
assertThat(date).hasMillisecond(0);
}
}

View File

@ -7,3 +7,4 @@ This module contains article about constructors in Java
- [Java Copy Constructor](https://www.baeldung.com/java-copy-constructor)
- [Cannot Reference “X” Before Supertype Constructor Has Been Called](https://www.baeldung.com/java-cannot-reference-x-before-supertype-constructor-error)
- [Private Constructors in Java](https://www.baeldung.com/java-private-constructors)
- [Throwing Exceptions in Constructors](https://www.baeldung.com/java-constructors-exceptions)

View File

@ -5,3 +5,4 @@ This module contains articles about Java operators
## Relevant Articles:
- [Logical vs Bitwise OR Operator](https://www.baeldung.com/java-logical-vs-bitwise-or-operator)
- [Bitmasking in Java with Bitwise Operators](https://www.baeldung.com/java-bitmasking)

View File

@ -25,9 +25,9 @@ public class GrepWithUnix4JIntegrationTest {
@Test
public void whenGrepWithSimpleString_thenCorrect() {
int expectedLineCount = 4;
int expectedLineCount = 5;
// grep "NINETEEN" dictionary.txt
// grep "NINETEEN" dictionary.in
List<Line> lines = Unix4j.grep("NINETEEN", fileToGrep).toLineList();
assertEquals(expectedLineCount, lines.size());
@ -35,9 +35,9 @@ public class GrepWithUnix4JIntegrationTest {
@Test
public void whenInverseGrepWithSimpleString_thenCorrect() {
int expectedLineCount = 178687;
int expectedLineCount = 8;
// grep -v "NINETEEN" dictionary.txt
// grep -v "NINETEEN" dictionary.in
List<Line> lines = grep(Options.v, "NINETEEN", fileToGrep).toLineList();
assertEquals(expectedLineCount, lines.size());
@ -45,9 +45,9 @@ public class GrepWithUnix4JIntegrationTest {
@Test
public void whenGrepWithRegex_thenCorrect() {
int expectedLineCount = 151;
int expectedLineCount = 5;
// grep -c ".*?NINE.*?" dictionary.txt
// grep -c ".*?NINE.*?" dictionary.in
String patternCount = grep(Options.c, ".*?NINE.*?", fileToGrep).cut(fields, ":", 1).toStringResult();
assertEquals(expectedLineCount, Integer.parseInt(patternCount));

View File

@ -0,0 +1,13 @@
EIGHTTEEN
EIGHTTEENS
EIGHTTEENTH
EIGHTTEENTHS
NINETEEN
NINETEENS
NINETEENTH
NINETEENTHS
TWENTY
TWENTHIES
TWENTHIETH
TWENTHIETHS
TWENTYNINETEEN

View File

@ -4,3 +4,4 @@
- [Java (String) or .toString()?](https://www.baeldung.com/java-string-casting-vs-tostring)
- [Split Java String by Newline](https://www.baeldung.com/java-string-split-by-newline)
- [Split a String in Java and Keep the Delimiters](https://www.baeldung.com/java-split-string-keep-delimiters)
- [Validate String as Filename in Java](https://www.baeldung.com/java-validate-filename)

View File

@ -0,0 +1,61 @@
package com.baeldung.stringfilenamevalidaiton;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
public class StringFilenameValidationUtils {
public static final Character[] INVALID_WINDOWS_SPECIFIC_CHARS = {'"', '*', ':', '<', '>', '?', '\\', '|', 0x7F};
public static final Character[] INVALID_UNIX_SPECIFIC_CHARS = {'\000'};
public static final String REGEX_PATTERN = "^[A-za-z0-9.]{1,255}$";
private StringFilenameValidationUtils() {
}
public static boolean validateStringFilenameUsingIO(String filename) throws IOException {
File file = new File(filename);
boolean created = false;
try {
created = file.createNewFile();
return created;
} finally {
if (created) {
file.delete();
}
}
}
public static boolean validateStringFilenameUsingNIO2(String filename) {
Paths.get(filename);
return true;
}
public static boolean validateStringFilenameUsingContains(String filename) {
if (filename == null || filename.isEmpty() || filename.length() > 255) {
return false;
}
return Arrays.stream(getInvalidCharsByOS())
.noneMatch(ch -> filename.contains(ch.toString()));
}
public static boolean validateStringFilenameUsingRegex(String filename) {
if (filename == null) {
return false;
}
return filename.matches(REGEX_PATTERN);
}
private static Character[] getInvalidCharsByOS() {
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
return INVALID_WINDOWS_SPECIFIC_CHARS;
} else if (os.contains("nix") || os.contains("nux") || os.contains("mac")) {
return INVALID_UNIX_SPECIFIC_CHARS;
} else {
return new Character[]{};
}
}
}

View File

@ -0,0 +1,130 @@
package com.baeldung.stringfilenamevalidaiton;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EmptySource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import java.io.IOException;
import java.nio.file.InvalidPathException;
import java.util.Arrays;
import java.util.stream.Stream;
import static com.baeldung.stringfilenamevalidaiton.StringFilenameValidationUtils.validateStringFilenameUsingContains;
import static com.baeldung.stringfilenamevalidaiton.StringFilenameValidationUtils.validateStringFilenameUsingIO;
import static com.baeldung.stringfilenamevalidaiton.StringFilenameValidationUtils.validateStringFilenameUsingNIO2;
import static com.baeldung.stringfilenamevalidaiton.StringFilenameValidationUtils.validateStringFilenameUsingRegex;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class StringFilenameValidationUnitTest {
private static final String CORRECT_FILENAME_PATTERN = "baeldung.txt";
@ParameterizedTest
@MethodSource("correctAlphanumericFilenamesProvider")
public void givenCorrectAlphanumericRandomFilenameString_whenValidateUsingIO_thenReturnTrue(String filename) throws IOException {
assertThat(validateStringFilenameUsingIO(filename)).isTrue();
assertThat(validateStringFilenameUsingNIO2(filename)).isTrue();
assertThat(validateStringFilenameUsingContains(filename)).isTrue();
assertThat(validateStringFilenameUsingRegex(filename)).isTrue();
}
@Test
public void givenTooLongFileNameString_whenValidate_thenIOAndCustomFailsNIO2Succeed() {
String filename = RandomStringUtils.randomAlphabetic(500);
assertThatThrownBy(() -> validateStringFilenameUsingIO(filename))
.isInstanceOf(IOException.class)
.hasMessageContaining("File name too long");
assertThat(validateStringFilenameUsingNIO2(filename)).isTrue();
assertThat(validateStringFilenameUsingContains(filename)).isFalse();
assertThat(validateStringFilenameUsingRegex(filename)).isFalse();
}
@ParameterizedTest
@NullSource
public void givenNullString_whenValidate_thenFails(String filename) {
assertThatThrownBy(() -> validateStringFilenameUsingIO(filename))
.isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> validateStringFilenameUsingNIO2(filename))
.isInstanceOf(NullPointerException.class);
assertThat(validateStringFilenameUsingContains(filename)).isFalse();
assertThat(validateStringFilenameUsingRegex(filename)).isFalse();
}
@ParameterizedTest
@EmptySource
public void givenEmptyString_whenValidate_thenIOAndCustomFailsNIO2Succeed(String filename) {
assertThatThrownBy(() -> validateStringFilenameUsingIO(filename))
.isInstanceOf(IOException.class);
assertThat(validateStringFilenameUsingNIO2(filename)).isTrue();
assertThat(validateStringFilenameUsingContains(filename)).isFalse();
assertThat(validateStringFilenameUsingRegex(filename)).isFalse();
}
@ParameterizedTest
@EnabledOnOs({OS.LINUX, OS.MAC})
@MethodSource("filenamesWithInvalidWindowsChars")
public void givenFilenameStringWithInvalidWindowsCharAndIsUnix_whenValidateUsingIO_thenReturnTrue(String filename) throws IOException {
assertThat(validateStringFilenameUsingIO(filename)).isTrue();
assertThat(validateStringFilenameUsingNIO2(filename)).isTrue();
assertThat(validateStringFilenameUsingContains(filename)).isTrue();
}
@ParameterizedTest
@EnabledOnOs(OS.WINDOWS)
@MethodSource("filenamesWithInvalidWindowsChars")
public void givenFilenameStringWithInvalidWindowsCharAndIsWindows_whenValidateUsingIO_thenRaiseException(String filename) {
assertThatThrownBy(() -> validateStringFilenameUsingIO(filename))
.isInstanceOf(IOException.class)
.hasMessageContaining("Invalid file path");
assertThatThrownBy(() -> validateStringFilenameUsingNIO2(filename))
.isInstanceOf(InvalidPathException.class)
.hasMessage("character not allowed");
assertThat(validateStringFilenameUsingContains(filename)).isFalse();
}
@ParameterizedTest
@EnabledOnOs({OS.LINUX, OS.MAC})
@MethodSource("filenamesWithInvalidUnixChars")
public void givenFilenameStringWithInvalidUnixCharAndIsUnix_whenValidate_thenRaiseException(String filename) {
assertThatThrownBy(() -> validateStringFilenameUsingIO(filename))
.isInstanceOf(IOException.class)
.hasMessageContaining("Invalid file path");
assertThatThrownBy(() -> validateStringFilenameUsingNIO2(filename))
.isInstanceOf(InvalidPathException.class)
.hasMessageContaining("character not allowed");
assertThat(validateStringFilenameUsingContains(filename)).isFalse();
}
private static Stream<String> correctAlphanumericFilenamesProvider() {
return Stream.generate(() -> RandomStringUtils.randomAlphanumeric(1, 10) + "." + RandomStringUtils.randomAlphabetic(3, 5)).limit(10);
}
private static Stream<String> filenamesWithInvalidWindowsChars() {
return Arrays.stream(StringFilenameValidationUtils.INVALID_WINDOWS_SPECIFIC_CHARS)
.map(character -> {
int idx = RandomUtils.nextInt(0, CORRECT_FILENAME_PATTERN.length());
return CORRECT_FILENAME_PATTERN.substring(0, idx) + character + CORRECT_FILENAME_PATTERN.substring(idx);
});
}
private static Stream<String> filenamesWithInvalidUnixChars() {
return Arrays.stream(StringFilenameValidationUtils.INVALID_UNIX_SPECIFIC_CHARS)
.map(character -> {
int idx = RandomUtils.nextInt(0, CORRECT_FILENAME_PATTERN.length());
return CORRECT_FILENAME_PATTERN.substring(0, idx) + character + CORRECT_FILENAME_PATTERN.substring(idx);
});
}
}

View File

@ -1,21 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.baeldung.docker</groupId>
<artifactId>heap-sizing</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>heap-sizing</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../parent-boot-2</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
@ -58,4 +58,8 @@
</plugins>
</build>
<properties>
<java.version>11</java.version>
</properties>
</project>

View File

@ -3,6 +3,11 @@ plugins {
id 'org.springframework.boot' version '2.3.4.RELEASE'
}
ext {
springBootVersion = '2.3.4.RELEASE'
lombokVersion = '1.18.14'
}
group = 'com.gradle'
version = '1.0.0'
sourceCompatibility = '14'
@ -12,19 +17,16 @@ repositories {
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter:2.3.4.RELEASE'
implementation "org.springframework.boot:spring-boot-starter:${springBootVersion}"
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.3.4.RELEASE'
compileOnly 'org.projectlombok:lombok:1.18.14'
testCompileOnly 'org.projectlombok:lombok:1.18.14'
compileOnly "org.projectlombok:lombok:${lombokVersion}"
runtimeOnly files('libs/sampleOne.jar', 'libs/sampleTwo.jar')
runtimeOnly fileTree("libs") { include "*.jar" }
runtimeOnly fileTree('libs') { include '*.jar' }
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
// implementation gradleApi()
testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
}
test {

234
gradle/gradle-dependency-management/gradlew vendored Executable file
View File

@ -0,0 +1,234 @@
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
#
# Gradle start up script for POSIX generated by Gradle.
#
# Important for running:
#
# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
# noncompliant, but you have some other compliant shell such as ksh or
# bash, then to run this script, type that shell name before the whole
# command line, like:
#
# ksh Gradle
#
# Busybox and similar reduced shells will NOT work, because this script
# requires all of these POSIX shell features:
# * functions;
# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
# * compound commands having a testable exit status, especially «case»;
# * various built-in commands including «command», «set», and «ulimit».
#
# Important for patching:
#
# (2) This script targets any POSIX shell, so it avoids extensions provided
# by Bash, Ksh, etc; in particular arrays are avoided.
#
# The "traditional" practice of packing multiple parameters into a
# space-separated string is a well documented source of bugs and security
# problems, so this is (mostly) avoided, by progressively accumulating
# options in "$@", and eventually passing that to Java.
#
# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
# see the in-line comments for details.
#
# There are tweaks for specific operating systems such as AIX, CygWin,
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
#
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
app_path=$0
# Need this for daisy-chained symlinks.
while
APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
[ -h "$app_path" ]
do
ls=$( ls -ld "$app_path" )
link=${ls#*' -> '}
case $link in #(
/*) app_path=$link ;; #(
*) app_path=$APP_HOME$link ;;
esac
done
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
APP_NAME="Gradle"
APP_BASE_NAME=${0##*/}
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
warn () {
echo "$*"
} >&2
die () {
echo
echo "$*"
echo
exit 1
} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "$( uname )" in #(
CYGWIN* ) cygwin=true ;; #(
Darwin* ) darwin=true ;; #(
MSYS* | MINGW* ) msys=true ;; #(
NONSTOP* ) nonstop=true ;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD=$JAVA_HOME/jre/sh/java
else
JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
fi
# Collect all arguments for the java command, stacking in reverse order:
# * args from the command line
# * the main class name
# * -classpath
# * -D...appname settings
# * --module-path (only if needed)
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
if "$cygwin" || "$msys" ; then
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
JAVACMD=$( cygpath --unix "$JAVACMD" )
# Now convert the arguments - kludge to limit ourselves to /bin/sh
for arg do
if
case $arg in #(
-*) false ;; # don't mess with options #(
/?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
[ -e "$t" ] ;; #(
*) false ;;
esac
then
arg=$( cygpath --path --ignore --mixed "$arg" )
fi
# Roll the args list around exactly as many times as the number of
# args, so each arg winds up back in the position where it started, but
# possibly modified.
#
# NB: a `for` loop captures its iteration list before it begins, so
# changing the positional parameters here affects neither the number of
# iterations, nor the values presented in `arg`.
shift # remove old arg
set -- "$@" "$arg" # push replacement arg
done
fi
# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \
"$@"
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
#
# In Bash we could simply go:
#
# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
# set -- "${ARGS[@]}" "$@"
#
# but POSIX shell has neither arrays nor command substitution, so instead we
# post-process each arg (as a line of input to sed) to backslash-escape any
# character that might be a shell metacharacter, then use eval to reverse
# that process (while maintaining the separation between arguments), and wrap
# the whole thing up as a single "set" statement.
#
# This will of course break if any of these variables contains a newline or
# an unmatched quote.
#
eval "set -- $(
printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
xargs -n1 |
sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
tr '\n' ' '
)" '"$@"'
exec "$JAVACMD" "$@"

View File

@ -0,0 +1,89 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega

Binary file not shown.

Binary file not shown.

74
ksqldb/pom.xml Normal file
View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>ksqldb-app</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ksqldb</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>${ksqldb.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<ksqldb.version>6.2.0</ksqldb.version>
<assertj.version>3.20.2</assertj.version>
<awaitility.version>4.1.0</awaitility.version>
<testcontainers.version>1.15.3</testcontainers.version>
</properties>
</project>

View File

@ -0,0 +1,29 @@
package com.baeldung.ksqldb;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class Alert {
@JsonProperty(value = "SENSOR_ID")
private String sensorId;
@JsonProperty(value = "START_PERIOD")
private String startPeriod;
@JsonProperty(value = "END_PERIOD")
private String endPeriod;
@JsonProperty(value = "AVERAGE_READING")
private double averageReading;
}

View File

@ -0,0 +1,75 @@
package com.baeldung.ksqldb;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.Row;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@AllArgsConstructor
@Slf4j
public class KsqlDBApplication {
private static final String CREATE_READINGS_STREAM = ""
+ " CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)"
+ " WITH (KAFKA_TOPIC = 'readings',"
+ " VALUE_FORMAT = 'JSON',"
+ " TIMESTAMP = 'timestamp',"
+ " TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',"
+ " PARTITIONS = 1);";
private static final String CREATE_ALERTS_TABLE = ""
+ " CREATE TABLE alerts AS"
+ " SELECT"
+ " sensor_id,"
+ " TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS start_period,"
+ " TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS end_period,"
+ " AVG(reading) AS average_reading"
+ " FROM readings"
+ " WINDOW TUMBLING (SIZE 30 MINUTES)"
+ " GROUP BY sensor_id"
+ " HAVING AVG(reading) > 25"
+ " EMIT CHANGES;";
private static final String ALERTS_QUERY = "SELECT * FROM alerts EMIT CHANGES;";
private static final String READINGS_STREAM = "readings";
private static final Map<String, Object> PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest");
private final Client client;
public CompletableFuture<ExecuteStatementResult> createReadingsStream() {
return client.executeStatement(CREATE_READINGS_STREAM, PROPERTIES);
}
public CompletableFuture<ExecuteStatementResult> createAlertsTable() {
return client.executeStatement(CREATE_ALERTS_TABLE, PROPERTIES);
}
public CompletableFuture<Void> insert(Collection<KsqlObject> rows) {
return CompletableFuture.allOf(
rows.stream()
.map(row -> client.insertInto(READINGS_STREAM, row))
.toArray(CompletableFuture[]::new)
);
}
public CompletableFuture<Void> subscribeOnAlerts(Subscriber<Row> subscriber) {
return client.streamQuery(ALERTS_QUERY, PROPERTIES)
.thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Alerts push query failed", ex);
}
});
}
}

View File

@ -0,0 +1,10 @@
package com.baeldung.ksqldb;
import lombok.Data;
@Data
public class Reading {
private String id;
private String timestamp;
private int reading;
}

View File

@ -0,0 +1,60 @@
package com.baeldung.ksqldb;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.api.client.Row;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class RowSubscriber<T> implements Subscriber<Row> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Class<T> clazz;
private Subscription subscription;
public List<T> consumedItems = new ArrayList<>();
public RowSubscriber(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
log.info("Subscriber is subscribed.");
this.subscription = subscription;
subscription.request(1);
}
@Override
public synchronized void onNext(Row row) {
String jsonString = row.asObject().toJsonString();
log.info("Row JSON: {}", jsonString);
try {
T item = OBJECT_MAPPER.readValue(jsonString, this.clazz);
log.info("Item: {}", item);
consumedItems.add(item);
} catch (JsonProcessingException e) {
log.error("Unable to parse json", e);
}
// Request the next row
subscription.request(1);
}
@Override
public synchronized void onError(Throwable t) {
log.error("Received an error", t);
}
@Override
public synchronized void onComplete() {
log.info("Query has ended.");
}
}

View File

@ -0,0 +1,160 @@
package com.baeldung.ksqldb;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.given;
@Testcontainers
class KsqlDBApplicationLiveTest {
private static final File KSQLDB_COMPOSE_FILE = new File("src/test/resources/docker/docker-compose.yml");
private static final Map<String, Object> PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest");
private static final String KSQLDB_SERVER_HOST = "localhost";
private static final int KSQLDB_SERVER_PORT = 8088;
@Container
public static DockerComposeContainer dockerComposeContainer =
new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
.withServices("zookeeper", "broker", "ksqldb-server")
.withExposedService("ksqldb-server", 8088,
Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
.withLocalCompose(true);
private KsqlDBApplication ksqlDBApplication;
private Client client;
@BeforeEach
void setup() {
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_PORT);
client = Client.create(options);
ksqlDBApplication = new KsqlDBApplication(client);
}
@AfterEach
void tearDown() {
deleteAlerts();
}
@Test
void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
createAlertsMaterializedView();
RowSubscriber<Alert> alertSubscriber = new RowSubscriber<>(Alert.class);
CompletableFuture<Void> result = ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
insertSampleData();
assertThat(result).isNotNull();
await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
assertThat(alertSubscriber.consumedItems)
.containsOnly(
expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
)
);
}
@Test
void givenSensorReadings_whenPullQueryForRow_thenRowIsReturned() {
createAlertsMaterializedView();
insertSampleData();
String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";
given().ignoreExceptions()
.await().atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
// it may be possible that the materialized view is not updated with sample data yet
// so ignore TimeoutException and try again
List<Row> rows = client.executeQuery(pullQuery, PROPERTIES)
.get(10, TimeUnit.SECONDS);
assertThat(rows).hasSize(1);
Row row = rows.get(0);
assertThat(row.getString("SENSOR_ID")).isEqualTo("sensor-2");
assertThat(row.getString("START_PERIOD")).isEqualTo("2021-08-01 10:00:00");
assertThat(row.getString("END_PERIOD")).isEqualTo("2021-08-01 10:30:00");
assertThat(row.getDouble("AVERAGE_READING")).isEqualTo(26.0);
});
}
private void createAlertsMaterializedView() {
ksqlDBApplication.createReadingsStream().join();
ksqlDBApplication.createAlertsTable().join();
}
private void insertSampleData() {
ksqlDBApplication.insert(
Arrays.asList(
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:20:00").put("reading", 20),
// these reading will exceed the alert threshold (sensor-1)
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:30:00").put("reading", 24),
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:40:00").put("reading", 30),
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:50:00").put("reading", 30),
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:00:00").put("reading", 24),
// these reading will exceed the alert threshold (sensor-2)
new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:10:00").put("reading", 26),
new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:20:00").put("reading", 26),
new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:30:00").put("reading", 24)
)
).join();
}
private void deleteAlerts() {
client.listQueries()
.thenApply(queryInfos -> queryInfos.stream()
.filter(queryInfo -> queryInfo.getQueryType() == QueryType.PERSISTENT)
.map(QueryInfo::getId)
.findFirst()
.orElseThrow(() -> new RuntimeException("Persistent query not found")))
.thenCompose(id -> client.executeStatement("TERMINATE " + id + ";"))
.thenCompose(result -> client.executeStatement("DROP TABLE alerts DELETE TOPIC;"))
.thenCompose(result -> client.executeStatement("DROP STREAM readings DELETE TOPIC;"))
.join();
}
private Alert expectedAlert(String sensorId, String startPeriod, String endPeriod, double average) {
return Alert.builder()
.sensorId(sensorId)
.startPeriod(startPeriod)
.endPeriod(endPeriod)
.averageReading(average)
.build();
}
}

View File

@ -0,0 +1,49 @@
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.2.0
hostname: broker
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqldb-server:
image: confluentinc/ksqldb-server:0.19.0
hostname: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
healthcheck:
test: curl -f http://ksqldb-server:8088/ || exit 1
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.19.0
hostname: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true

View File

@ -0,0 +1,6 @@
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n

View File

@ -13,7 +13,6 @@ Remember, for advanced libraries like [Jackson](/jackson) and [JUnit](/testing-m
- [Implementing a FTP-Client in Java](https://www.baeldung.com/java-ftp-client)
- [Introduction to Functional Java](https://www.baeldung.com/java-functional-library)
- [A Guide to the Reflections Library](https://www.baeldung.com/reflections-library)
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
- [Introduction to Protonpack](https://www.baeldung.com/java-protonpack)
- [Java-R Integration](https://www.baeldung.com/java-r-integration)
- [Using libphonenumber to Validate Phone Numbers](https://www.baeldung.com/java-libphonenumber)

View File

@ -22,18 +22,6 @@
<artifactId>protonpack</artifactId>
<version>${protonpack.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
@ -148,7 +136,6 @@
</build>
<properties>
<kafka.version>2.0.0</kafka.version>
<javapoet.version>1.10.0</javapoet.version>
<reflections.version>0.9.11</reflections.version>
<mockftpserver.version>2.7.1</mockftpserver.version>

View File

@ -11,8 +11,6 @@ This module contains articles about libraries for data processing in Java.
- [An Introduction to SuanShu](https://www.baeldung.com/suanshu)
- [Intro to Derive4J](https://www.baeldung.com/derive4j)
- [Univocity Parsers](https://www.baeldung.com/java-univocity-parsers)
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
- More articles: [[<-- prev]](/../libraries-data)
##### Building the project

View File

@ -116,11 +116,6 @@
<artifactId>univocity-parsers</artifactId>
<version>${univocity.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -144,13 +139,6 @@
<version>${byte-buddy.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
@ -176,7 +164,6 @@
<slf4j.version>1.7.25</slf4j.version>
<awaitility.version>3.0.0</awaitility.version>
<univocity.version>2.8.4</univocity.version>
<kafka.version>2.5.0</kafka.version>
<guava.version>29.0-jre</guava.version>
</properties>

View File

@ -1,82 +0,0 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream =
environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@ -1,34 +0,0 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@ -1,11 +0,0 @@
## Data Libraries
This module contains articles about libraries for data processing in Java.
### Relevant articles
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
- More articles: [[<-- prev]](/../libraries-data-2)
##### Building the project
You can build the project from the command line using: *mvn clean install*, or in an IDE. If you have issues with the derive4j imports in your IDE, you have to add the folder: *target/generated-sources/annotations* to the project build path in your IDE.

View File

@ -1,64 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>libraries-data-3</artifactId>
<name>libraries-data-3</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<assertj.version>3.6.2</assertj.version>
<slf4j.version>1.7.25</slf4j.version>
<kafka.version>2.8.0</kafka.version>
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
</properties>
</project>

View File

@ -3,14 +3,10 @@
This module contains articles about libraries for data processing in Java.
### Relevant articles
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
- [Introduction to JCache](https://www.baeldung.com/jcache)
- [A Guide to Apache Ignite](https://www.baeldung.com/apache-ignite)
- [Apache Ignite with Spring Data](https://www.baeldung.com/apache-ignite-spring-data)
- [A Guide to Apache Crunch](https://www.baeldung.com/apache-crunch)
- [Intro to Apache Storm](https://www.baeldung.com/apache-storm)
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
- [Guide to JMapper](https://www.baeldung.com/jmapper)
More articles: [[next -->]](/../libraries-data-2)

View File

@ -0,0 +1,3 @@
### Relevant Articles:
- [Hosting a Maven Repository on GitHub](https://www.baeldung.com/maven-repo-github)

View File

@ -81,13 +81,18 @@
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.netflix.spectator</groupId>
<artifactId>spectator-api</artifactId>
<version>0.132.0</version>
</dependency>
</dependencies>
<properties>
<dep.ver.metrics>3.1.2</dep.ver.metrics>
<dep.ver.servlet>3.1.0</dep.ver.servlet>
<netflix.servo.ver>0.12.17</netflix.servo.ver>
<micrometer.ver>0.12.0.RELEASE</micrometer.ver>
<micrometer.ver>1.7.1</micrometer.ver>
<!-- <fasterxml.jackson.version>2.9.1</fasterxml.jackson.version> -->
<spring-boot-starter-web.version>2.0.7.RELEASE</spring-boot-starter-web.version>
<assertj-core.version>3.11.1</assertj-core.version>

View File

@ -1,10 +1,10 @@
package com.baeldung.metrics.micrometer;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import io.micrometer.core.instrument.binder.JvmThreadMetrics;
@SpringBootApplication
public class MicrometerApp {
@ -14,7 +14,7 @@ public class MicrometerApp {
return new JvmThreadMetrics();
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
SpringApplication.run(MicrometerApp.class, args);
}

View File

@ -1,11 +1,7 @@
package com.baeldung.metrics.micrometer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import static org.assertj.core.api.Assertions.withinPercentage;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -16,29 +12,25 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter.Type;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.ValueAtPercentile;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.instrument.stats.hist.Histogram;
import io.micrometer.core.instrument.stats.quantile.WindowSketchQuantiles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.data.Percentage;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import com.netflix.spectator.atlas.AtlasConfig;
@ -55,7 +47,7 @@ public class MicrometerAtlasIntegrationTest {
@Override
public Duration step() {
return Duration.ofSeconds(1);
return Duration.ofSeconds(10);
}
@Override
@ -77,9 +69,9 @@ public class MicrometerAtlasIntegrationTest {
compositeRegistry.gauge("baeldung.heat", 90);
Optional<Gauge> oneGauge = oneSimpleMeter
Optional<Gauge> oneGauge = Optional.ofNullable(oneSimpleMeter
.find("baeldung.heat")
.gauge();
.gauge());
assertTrue(oneGauge.isPresent());
Iterator<Measurement> measurements = oneGauge
.get()
@ -91,9 +83,9 @@ public class MicrometerAtlasIntegrationTest {
.next()
.getValue(), equalTo(90.00));
Optional<Gauge> atlasGauge = atlasMeterRegistry
Optional<Gauge> atlasGauge = Optional.ofNullable(atlasMeterRegistry
.find("baeldung.heat")
.gauge();
.gauge());
assertTrue(atlasGauge.isPresent());
Iterator<Measurement> anotherMeasurements = atlasGauge
.get()
@ -122,14 +114,14 @@ public class MicrometerAtlasIntegrationTest {
.increment();
new CountedObject();
Optional<Counter> counterOptional = Metrics.globalRegistry
Optional<Counter> counterOptional = Optional.ofNullable(Metrics.globalRegistry
.find("objects.instance")
.counter();
.counter());
assertTrue(counterOptional.isPresent());
assertTrue(counterOptional
assertEquals(counterOptional
.get()
.count() == 2.0);
.count() , 2.0, 0.0);
}
@Test
@ -142,10 +134,10 @@ public class MicrometerAtlasIntegrationTest {
.register(registry);
counter.increment(2.0);
assertTrue(counter.count() == 2);
assertEquals(counter.count(), 2, 0);
counter.increment(-1);
assertTrue(counter.count() == 2);
assertEquals(counter.count(), 1, 0);
}
@Test
@ -161,7 +153,7 @@ public class MicrometerAtlasIntegrationTest {
timer.record(30, TimeUnit.MILLISECONDS);
assertTrue(2 == timer.count());
assertEquals(2, timer.count(), 0);
assertThat(timer.totalTime(TimeUnit.MILLISECONDS)).isBetween(40.0, 55.0);
}
@ -173,12 +165,12 @@ public class MicrometerAtlasIntegrationTest {
.builder("3rdPartyService")
.register(registry);
long currentTaskId = longTaskTimer.start();
LongTaskTimer.Sample currentTaskId = longTaskTimer.start();
try {
TimeUnit.MILLISECONDS.sleep(2);
} catch (InterruptedException ignored) {
}
long timeElapsed = longTaskTimer.stop(currentTaskId);
long timeElapsed = currentTaskId.stop();
assertEquals(2L, timeElapsed/((int) 1e6),1L);
}
@ -191,10 +183,10 @@ public class MicrometerAtlasIntegrationTest {
.builder("cache.size", list, List::size)
.register(registry);
assertTrue(gauge.value() == 0.0);
assertEquals(gauge.value(), 0.0, 0.0);
list.add("1");
assertTrue(gauge.value() == 1.0);
assertEquals(gauge.value(), 1.0, 0.0);
}
@Test
@ -208,18 +200,17 @@ public class MicrometerAtlasIntegrationTest {
distributionSummary.record(4);
distributionSummary.record(5);
assertTrue(3 == distributionSummary.count());
assertTrue(12 == distributionSummary.totalAmount());
assertEquals(3, distributionSummary.count(), 0);
assertEquals(12, distributionSummary.totalAmount(), 0);
}
@Test
public void givenTimer_whenEnrichWithQuantile_thenQuantilesComputed() {
public void givenTimer_whenEnrichWithPercentile_thenPercentilesComputed() {
SimpleMeterRegistry registry = new SimpleMeterRegistry();
Timer timer = Timer
.builder("test.timer")
.quantiles(WindowSketchQuantiles
.quantiles(0.3, 0.5, 0.95)
.create())
.publishPercentiles(0.3, 0.5, 0.95)
.publishPercentileHistogram()
.register(registry);
timer.record(2, TimeUnit.SECONDS);
@ -229,27 +220,18 @@ public class MicrometerAtlasIntegrationTest {
timer.record(8, TimeUnit.SECONDS);
timer.record(13, TimeUnit.SECONDS);
Map<String, Integer> quantileMap = extractTagValueMap(registry, Type.Gauge, 1e9);
assertThat(quantileMap, allOf(hasEntry("quantile=0.3", 2), hasEntry("quantile=0.5", 3), hasEntry("quantile=0.95", 8)));
}
Map<Double, Double> expectedMicrometer = new TreeMap<>();
expectedMicrometer.put(0.3, 1946.157056);
expectedMicrometer.put(0.5, 3019.89888);
expectedMicrometer.put(0.95, 13354.663936);
private Map<String, Integer> extractTagValueMap(MeterRegistry registry, Type meterType, double valueDivisor) {
return registry
.getMeters()
.stream()
.filter(meter -> meter.getType() == meterType)
.collect(Collectors.toMap(meter -> {
Tag tag = meter
.getId()
.getTags()
.iterator()
.next();
return tag.getKey() + "=" + tag.getValue();
}, meter -> (int) (meter
.measure()
.iterator()
.next()
.getValue() / valueDivisor)));
Map<Double, Double> actualMicrometer = new TreeMap<>();
ValueAtPercentile[] percentiles = timer.takeSnapshot().percentileValues();
for (ValueAtPercentile percentile : percentiles) {
actualMicrometer.put(percentile.percentile(), percentile.value(TimeUnit.MILLISECONDS));
}
assertEquals(expectedMicrometer, actualMicrometer);
}
@Test
@ -257,7 +239,7 @@ public class MicrometerAtlasIntegrationTest {
SimpleMeterRegistry registry = new SimpleMeterRegistry();
DistributionSummary hist = DistributionSummary
.builder("summary")
.histogram(Histogram.linear(0, 10, 5))
.serviceLevelObjectives(1, 10, 5)
.register(registry);
hist.record(3);
@ -267,17 +249,28 @@ public class MicrometerAtlasIntegrationTest {
hist.record(13);
hist.record(26);
Map<String, Integer> histograms = extractTagValueMap(registry, Type.Counter, 1.0);
Map<Integer, Double> expectedMicrometer = new TreeMap<>();
expectedMicrometer.put(1,0D);
expectedMicrometer.put(10,2D);
expectedMicrometer.put(5,1D);
assertThat(histograms, allOf(hasEntry("bucket=0.0", 0), hasEntry("bucket=10.0", 2), hasEntry("bucket=20.0", 2), hasEntry("bucket=30.0", 1), hasEntry("bucket=40.0", 1), hasEntry("bucket=Infinity", 0)));
Map<Integer, Double> actualMicrometer = new TreeMap<>();
HistogramSnapshot snapshot = hist.takeSnapshot();
Arrays.stream(snapshot.histogramCounts()).forEach(p -> {
actualMicrometer.put((Integer.valueOf((int) p.bucket())), p.count());
});
assertEquals(expectedMicrometer, actualMicrometer);
}
@Test
public void givenTimer_whenEnrichWithTimescaleHistogram_thenTimeScaleDataCollected() {
SimpleMeterRegistry registry = new SimpleMeterRegistry();
Duration[] durations = {Duration.ofMillis(25), Duration.ofMillis(300), Duration.ofMillis(600)};
Timer timer = Timer
.builder("timer")
.histogram(Histogram.linearTime(TimeUnit.MILLISECONDS, 0, 200, 3))
.sla(durations)
.publishPercentileHistogram()
.register(registry);
timer.record(1000, TimeUnit.MILLISECONDS);
@ -286,10 +279,18 @@ public class MicrometerAtlasIntegrationTest {
timer.record(341, TimeUnit.MILLISECONDS);
timer.record(500, TimeUnit.MILLISECONDS);
Map<String, Integer> histograms = extractTagValueMap(registry, Type.Counter, 1.0);
Map<Double, Double> expectedMicrometer = new TreeMap<>();
expectedMicrometer.put(2.5E7,1D);
expectedMicrometer.put(3.0E8,1D);
expectedMicrometer.put(6.0E8,4D);
assertThat(histograms, allOf(hasEntry("bucket=0.0", 0), hasEntry("bucket=2.0E8", 1), hasEntry("bucket=4.0E8", 1), hasEntry("bucket=Infinity", 3)));
Map<Double, Double> actualMicrometer = new TreeMap<>();
HistogramSnapshot snapshot = timer.takeSnapshot();
Arrays.stream(snapshot.histogramCounts()).forEach(p -> {
actualMicrometer.put((Double.valueOf((int) p.bucket())), p.count());
});
assertEquals(expectedMicrometer, actualMicrometer);
}
}

View File

@ -57,6 +57,7 @@
<module>spring-boot-persistence</module>
<module>spring-boot-persistence-h2</module>
<module>spring-boot-persistence-mongodb</module>
<module>spring-data-arangodb</module>
<module>spring-data-cassandra</module>
<module>spring-data-cassandra-test</module>
<module>spring-data-cassandra-reactive</module>

View File

@ -50,7 +50,7 @@ public class QueryXmlResourceWithConcurrentAxisIntegrationTest {
@Test
public void createDatabaseAndXMarkResourceAndCheckQuery() throws IOException {
final var pathToXmlFile = XML_DIRECTORY.resolve("10mb.xml");
final var pathToXmlFile = XML_DIRECTORY.resolve("regions.xml");
// Create an empty XML database.
Databases.createXmlDatabase(new DatabaseConfiguration(DATABASE_PATH));

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,8 @@
=========
## Spring Data ArangoDB
### Relevant Articles:
- [Spring Data with ArangoDB](https://www.baeldung.com/spring-data-arangodb)

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-data-arangodb</artifactId>
<name>spring-data-arangodb</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../parent-boot-2</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spring-data</artifactId>
<version>3.5.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,7 @@
FROM arangodb:3.8.0
COPY init-session.js /docker-entrypoint-initdb.d/
EXPOSE 8529
ENV ARANGO_ROOT_PASSWORD=password

View File

@ -0,0 +1 @@
rs.initiate();

View File

@ -0,0 +1,5 @@
#!/bin/bash
docker image build -t spring-data-arangodb:live-test .
docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=password --name spring-data-arangodb-live-test spring-data-arangodb:live-test

View File

@ -0,0 +1,4 @@
#!/bin/bash
docker stop spring-data-arangodb-live-test
docker rm spring-data-arangodb-live-test

View File

@ -0,0 +1,3 @@
#!/bin/bash
mvn clean compile test -P live-all -f ../../../pom.xml

View File

@ -1,11 +1,13 @@
package com.baeldung.tailablecursor;
package com.baeldung.arangodb;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class LogsCounterApplication {
public class ArangoDbSpringDataApplication {
public static void main(String[] args) {
SpringApplication.run(LogsCounterApplication.class, args);
SpringApplication.run(ArangoDbSpringDataApplication.class, args);
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.arangodb.configuration;
import com.arangodb.ArangoDB;
import com.arangodb.springframework.annotation.EnableArangoRepositories;
import com.arangodb.springframework.config.ArangoConfiguration;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableArangoRepositories(basePackages = {"com.baeldung"})
public class ArangoDbConfiguration implements ArangoConfiguration {
@Override
public ArangoDB.Builder arango() {
return new ArangoDB.Builder()
.host("127.0.0.1", 8529)
.user("root")
.password("password");
}
@Override
public String database() {
return "baeldung-database";
}
}

View File

@ -0,0 +1,86 @@
package com.baeldung.arangodb.model;
import com.arangodb.springframework.annotation.ArangoId;
import com.arangodb.springframework.annotation.Document;
import com.arangodb.springframework.annotation.Relations;
import org.springframework.data.annotation.Id;
import java.time.ZonedDateTime;
import java.util.Collection;
@Document("articles")
public class Article {
@Id
private String id;
@ArangoId
private String arangoId;
private String name;
private String author;
private ZonedDateTime publishDate;
private String htmlContent;
@Relations(edges = ArticleLink.class, lazy = true)
private Collection<Author> authors;
public Article() {
super();
}
public Article(String name, String author, ZonedDateTime publishDate, String htmlContent) {
this.name = name;
this.author = author;
this.publishDate = publishDate;
this.htmlContent = htmlContent;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getArangoId() {
return arangoId;
}
public void setArangoId(String arangoId) {
this.arangoId = arangoId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public ZonedDateTime getPublishDate() {
return publishDate;
}
public void setPublishDate(ZonedDateTime publishDate) {
this.publishDate = publishDate;
}
public String getHtmlContent() {
return htmlContent;
}
public void setHtmlContent(String htmlContent) {
this.htmlContent = htmlContent;
}
}

View File

@ -0,0 +1,39 @@
package com.baeldung.arangodb.model;
import com.arangodb.springframework.annotation.Edge;
import com.arangodb.springframework.annotation.From;
import com.arangodb.springframework.annotation.To;
@Edge
public class ArticleLink {
@From
private Article article;
@To
private Author author;
public ArticleLink() {
}
public ArticleLink(Article article, Author author) {
this.article = article;
this.author = author;
}
public Article getArticle() {
return article;
}
public void setArticle(Article article) {
this.article = article;
}
public Author getAuthor() {
return author;
}
public void setAuthor(Author author) {
this.author = author;
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.arangodb.model;
import com.arangodb.springframework.annotation.ArangoId;
import com.arangodb.springframework.annotation.Document;
import org.springframework.data.annotation.Id;
@Document("articles")
public class Author {
@Id
private String id;
@ArangoId
private String arangoId;
private String name;
public Author() {
super();
}
public Author(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getArangoId() {
return arangoId;
}
public void setArangoId(String arangoId) {
this.arangoId = arangoId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.arangodb.repository;
import com.arangodb.springframework.annotation.Query;
import com.arangodb.springframework.repository.ArangoRepository;
import com.baeldung.arangodb.model.Article;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
@Repository
public interface ArticleRepository extends ArangoRepository<Article, String> {
Iterable<Article> findByAuthor(String author);
@Query("FOR a IN articles FILTER a.author == @author SORT a.publishDate ASC RETURN a")
Iterable<Article> getByAuthor(@Param("author") String author);
}

View File

@ -0,0 +1,3 @@
arangodb.hosts=127.0.0.1:8529
arangodb.user=root
arangodb.password=password

View File

@ -0,0 +1,113 @@
package com.baeldung.arangodb;
import com.baeldung.arangodb.model.Article;
import com.baeldung.arangodb.repository.ArticleRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SpringBootTest
public class ArticleRepositoryIntegrationTest {
@Autowired
ArticleRepository articleRepository;
@Test
public void givenNewArticle_whenSaveInArangoDb_thenDataIsCorrect() {
Article newArticle = new Article(
"ArangoDb with Spring Data",
"Baeldung Writer",
ZonedDateTime.now(),
"<html>Some HTML content</html>"
);
Article savedArticle = articleRepository.save(newArticle);
assertNotNull(savedArticle.getId());
assertNotNull(savedArticle.getArangoId());
assertEquals(savedArticle.getName(), newArticle.getName());
assertEquals(savedArticle.getAuthor(), newArticle.getAuthor());
assertEquals(savedArticle.getPublishDate(), newArticle.getPublishDate());
assertEquals(savedArticle.getHtmlContent(), newArticle.getHtmlContent());
}
@Test
public void givenArticleId_whenReadFromArangoDb_thenDataIsCorrect() {
Article newArticle = new Article(
"ArangoDb with Spring Data",
"Baeldung Writer",
ZonedDateTime.now(),
"<html>Some HTML content</html>"
);
Article savedArticle = articleRepository.save(newArticle);
String articleId = savedArticle.getId();
Optional<Article> article = articleRepository.findById(articleId);
assertTrue(article.isPresent());
Article foundArticle = article.get();
assertEquals(foundArticle.getId(), articleId);
assertEquals(foundArticle.getArangoId(), savedArticle.getArangoId());
assertEquals(foundArticle.getName(), savedArticle.getName());
assertEquals(foundArticle.getAuthor(), savedArticle.getAuthor());
assertEquals(foundArticle.getPublishDate(), savedArticle.getPublishDate());
assertEquals(foundArticle.getHtmlContent(), savedArticle.getHtmlContent());
}
@Test
public void givenArticleId_whenDeleteFromArangoDb_thenDataIsGone() {
Article newArticle = new Article(
"ArangoDb with Spring Data",
"Baeldung Writer",
ZonedDateTime.now(),
"<html>Some HTML content</html>"
);
Article savedArticle = articleRepository.save(newArticle);
String articleId = savedArticle.getId();
articleRepository.deleteById(articleId);
Optional<Article> article = articleRepository.findById(articleId);
assertFalse(article.isPresent());
}
@Test
public void givenAuthorName_whenGetByAuthor_thenListOfArticles() {
Article newArticle = new Article(
"ArangoDb with Spring Data",
"Baeldung Writer",
ZonedDateTime.now(),
"<html>Some HTML content</html>"
);
articleRepository.save(newArticle);
Iterable<Article> articlesByAuthor = articleRepository.findByAuthor(newArticle.getAuthor());
List<Article> articlesByAuthorList = new ArrayList<>();
articlesByAuthor.forEach(articlesByAuthorList::add);
assertEquals(1, articlesByAuthorList.size());
Article foundArticle = articlesByAuthorList.get(0);
assertEquals(foundArticle.getName(), newArticle.getName());
assertEquals(foundArticle.getAuthor(), newArticle.getAuthor());
assertEquals(foundArticle.getPublishDate(), newArticle.getPublishDate());
assertEquals(foundArticle.getHtmlContent(), newArticle.getHtmlContent());
}
}

Some files were not shown because too many files have changed in this diff Show More