Merge branch 'eugenp:master' into master
This commit is contained in:
commit
f49b1f9235
|
@ -8,6 +8,9 @@ import java.util.PriorityQueue;
|
|||
import java.util.Queue;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class RouteFinder<T extends GraphNode> {
|
||||
private final Graph<T> graph;
|
||||
private final Scorer<T> nextNodeScorer;
|
||||
|
@ -28,11 +31,11 @@ public class RouteFinder<T extends GraphNode> {
|
|||
openSet.add(start);
|
||||
|
||||
while (!openSet.isEmpty()) {
|
||||
System.out.println("Open Set contains: " + openSet.stream().map(RouteNode::getCurrent).collect(Collectors.toSet()));
|
||||
log.debug("Open Set contains: " + openSet.stream().map(RouteNode::getCurrent).collect(Collectors.toSet()));
|
||||
RouteNode<T> next = openSet.poll();
|
||||
System.out.println("Looking at node: " + next);
|
||||
log.debug("Looking at node: " + next);
|
||||
if (next.getCurrent().equals(to)) {
|
||||
System.out.println("Found our destination!");
|
||||
log.debug("Found our destination!");
|
||||
|
||||
List<T> route = new ArrayList<>();
|
||||
RouteNode<T> current = next;
|
||||
|
@ -41,7 +44,7 @@ public class RouteFinder<T extends GraphNode> {
|
|||
current = allNodes.get(current.getPrevious());
|
||||
} while (current != null);
|
||||
|
||||
System.out.println("Route: " + route);
|
||||
log.debug("Route: " + route);
|
||||
return route;
|
||||
}
|
||||
|
||||
|
@ -55,7 +58,7 @@ public class RouteFinder<T extends GraphNode> {
|
|||
nextNode.setRouteScore(newScore);
|
||||
nextNode.setEstimatedScore(newScore + targetScorer.computeCost(connection, to));
|
||||
openSet.add(nextNode);
|
||||
System.out.println("Found a better route to node: " + nextNode);
|
||||
log.debug("Found a better route to node: " + nextNode);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package com.baeldung.algorithms.astar.underground;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -10,9 +12,13 @@ import java.util.stream.Stream;
|
|||
|
||||
import com.baeldung.algorithms.astar.Graph;
|
||||
import com.baeldung.algorithms.astar.RouteFinder;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@Slf4j
|
||||
public class RouteFinderIntegrationTest {
|
||||
|
||||
private Graph<Station> underground;
|
||||
|
@ -637,7 +643,8 @@ public class RouteFinderIntegrationTest {
|
|||
@Test
|
||||
public void findRoute() {
|
||||
List<Station> route = routeFinder.findRoute(underground.getNode("74"), underground.getNode("7"));
|
||||
assertThat(route).size().isPositive();
|
||||
|
||||
System.out.println(route.stream().map(Station::getName).collect(Collectors.toList()));
|
||||
route.stream().map(Station::getName).collect(Collectors.toList()).forEach(station -> log.debug(station));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,19 +20,16 @@
|
|||
<version>${org.assertj.core.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>${commons-collections4.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${guava.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.squareup.retrofit2</groupId>
|
||||
<artifactId>retrofit</artifactId>
|
||||
|
@ -43,13 +40,11 @@
|
|||
<artifactId>converter-jackson</artifactId>
|
||||
<version>${retrofit.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons.lang3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>pl.pragmatists</groupId>
|
||||
<artifactId>JUnitParams</artifactId>
|
||||
|
|
|
@ -39,7 +39,6 @@
|
|||
<artifactId>junit-platform-commons</artifactId>
|
||||
<version>${junit.platform.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
|
|
|
@ -20,4 +20,4 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -37,4 +37,4 @@
|
|||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -52,4 +52,4 @@
|
|||
<httpclient.version>4.5.2</httpclient.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -107,4 +107,4 @@
|
|||
<cargo-maven2-plugin.version>1.6.1</cargo-maven2-plugin.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -38,4 +38,4 @@
|
|||
<cxf.version>3.1.8</cxf.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -18,4 +18,4 @@
|
|||
<module>sse-jaxrs-client</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -14,7 +14,6 @@
|
|||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>javax.ws.rs</groupId>
|
||||
<artifactId>javax.ws.rs-api</artifactId>
|
||||
|
@ -33,7 +32,6 @@
|
|||
<version>${bind-api.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -85,4 +83,4 @@
|
|||
<bind-api.version>1.0</bind-api.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -0,0 +1,18 @@
|
|||
## Apache Kafka
|
||||
|
||||
This module contains articles about Apache Kafka.
|
||||
|
||||
### Relevant articles
|
||||
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
|
||||
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
|
||||
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
|
||||
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
|
||||
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
|
||||
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
|
||||
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
|
||||
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
|
||||
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
|
||||
|
||||
|
||||
##### Building the project
|
||||
You can build the project from the command line using: *mvn clean install*, or in an IDE.
|
|
@ -0,0 +1,180 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>apache-kafka</artifactId>
|
||||
<name>apache-kafka</name>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>${org.slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>${org.slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-core</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-java</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-test-utils_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${guava.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility-proxy</artifactId>
|
||||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>kafka</artifactId>
|
||||
<version>${testcontainers-kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<version>${testcontainers-jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
<version>${org.apache.spark.spark-core.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
<version>${org.apache.spark.spark-core.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-graphx_2.11</artifactId>
|
||||
<version>${org.apache.spark.spark-core.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-streaming_2.11</artifactId>
|
||||
<version>${org.apache.spark.spark-core.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-mllib_2.11</artifactId>
|
||||
<version>${org.apache.spark.spark-core.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
|
||||
<version>${org.apache.spark.spark-core.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.spark</groupId>
|
||||
<artifactId>spark-cassandra-connector_2.11</artifactId>
|
||||
<version>${com.datastax.spark.spark-cassandra-connector.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.spark</groupId>
|
||||
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
|
||||
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<assertj.version>3.6.2</assertj.version>
|
||||
<kafka.version>2.8.0</kafka.version>
|
||||
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
|
||||
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
|
||||
<flink.version>1.5.0</flink.version>
|
||||
<awaitility.version>3.0.0</awaitility.version>
|
||||
<guava.version>29.0-jre</guava.version>
|
||||
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
|
||||
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
|
||||
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
|
||||
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,70 @@
|
|||
package com.baeldung.flink;
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import com.baeldung.flink.operator.BackupAggregator;
|
||||
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
|
||||
import com.baeldung.flink.operator.WordsCapitalizer;
|
||||
import org.apache.flink.streaming.api.TimeCharacteristic;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
|
||||
|
||||
import static com.baeldung.flink.connector.Consumers.*;
|
||||
import static com.baeldung.flink.connector.Producers.*;
|
||||
|
||||
public class FlinkDataPipeline {
|
||||
|
||||
public static void capitalize() throws Exception {
|
||||
String inputTopic = "flink_input";
|
||||
String outputTopic = "flink_output";
|
||||
String consumerGroup = "baeldung";
|
||||
String address = "localhost:9092";
|
||||
|
||||
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
|
||||
flinkKafkaConsumer.setStartFromEarliest();
|
||||
|
||||
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
|
||||
|
||||
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
|
||||
|
||||
stringInputStream.map(new WordsCapitalizer())
|
||||
.addSink(flinkKafkaProducer);
|
||||
|
||||
environment.execute();
|
||||
}
|
||||
|
||||
public static void createBackup() throws Exception {
|
||||
String inputTopic = "flink_input";
|
||||
String outputTopic = "flink_output";
|
||||
String consumerGroup = "baeldung";
|
||||
String kafkaAddress = "localhost:9092";
|
||||
|
||||
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
|
||||
|
||||
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
|
||||
flinkKafkaConsumer.setStartFromEarliest();
|
||||
|
||||
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
|
||||
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
|
||||
|
||||
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
|
||||
|
||||
inputMessagesStream.timeWindowAll(Time.hours(24))
|
||||
.aggregate(new BackupAggregator())
|
||||
.addSink(flinkKafkaProducer);
|
||||
|
||||
environment.execute();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
createBackup();
|
||||
}
|
||||
|
||||
}
|
|
@ -9,23 +9,20 @@ import java.util.Properties;
|
|||
|
||||
public class Consumers {
|
||||
|
||||
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
|
||||
String topic, String kafkaAddress, String kafkaGroup ) {
|
||||
Properties props = new Properties();
|
||||
props.setProperty("bootstrap.servers", kafkaAddress);
|
||||
props.setProperty("group.id",kafkaGroup);
|
||||
FlinkKafkaConsumer011<String> consumer =
|
||||
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
|
||||
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
|
||||
Properties props = new Properties();
|
||||
props.setProperty("bootstrap.servers", kafkaAddress);
|
||||
props.setProperty("group.id", kafkaGroup);
|
||||
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
|
||||
|
||||
return consumer;
|
||||
}
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
|
||||
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("bootstrap.servers", kafkaAddress);
|
||||
properties.setProperty("group.id",kafkaGroup);
|
||||
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
|
||||
topic, new InputMessageDeserializationSchema(),properties);
|
||||
properties.setProperty("group.id", kafkaGroup);
|
||||
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
|
||||
|
||||
return consumer;
|
||||
}
|
|
@ -18,6 +18,7 @@ public class InputMessage {
|
|||
public String getSender() {
|
||||
return sender;
|
||||
}
|
||||
|
||||
public void setSender(String sender) {
|
||||
this.sender = sender;
|
||||
}
|
||||
|
@ -55,12 +56,14 @@ public class InputMessage {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
InputMessage message1 = (InputMessage) o;
|
||||
return Objects.equal(sender, message1.sender) &&
|
||||
Objects.equal(recipient, message1.recipient) &&
|
||||
Objects.equal(sentAt, message1.sentAt) &&
|
||||
return Objects.equal(sender, message1.sender) &&
|
||||
Objects.equal(recipient, message1.recipient) &&
|
||||
Objects.equal(sentAt, message1.sentAt) &&
|
||||
Objects.equal(message, message1.message);
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
package com.baeldung.flink.operator;
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
|
||||
@Override
|
||||
public List<InputMessage> createAccumulator() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
|
||||
inputMessages.add(inputMessage);
|
||||
return inputMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Backup getResult(List<InputMessage> inputMessages) {
|
||||
Backup backup = new Backup(inputMessages, LocalDateTime.now());
|
||||
return backup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
|
||||
inputMessages.addAll(acc1);
|
||||
return inputMessages;
|
||||
}
|
||||
}
|
|
@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
|
|||
@Override
|
||||
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
|
||||
ZoneId zoneId = ZoneId.systemDefault();
|
||||
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
|
||||
return element.getSentAt()
|
||||
.atZone(zoneId)
|
||||
.toEpochSecond() * 1000;
|
||||
}
|
||||
|
||||
@Nullable
|
|
@ -9,8 +9,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class BackupSerializationSchema
|
||||
implements SerializationSchema<Backup> {
|
||||
public class BackupSerializationSchema implements SerializationSchema<Backup> {
|
||||
|
||||
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
|
||||
|
@ -18,7 +17,7 @@ public class BackupSerializationSchema
|
|||
|
||||
@Override
|
||||
public byte[] serialize(Backup backupMessage) {
|
||||
if(objectMapper == null) {
|
||||
if (objectMapper == null) {
|
||||
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
|
||||
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
}
|
|
@ -8,12 +8,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
public class InputMessageDeserializationSchema implements
|
||||
DeserializationSchema<InputMessage> {
|
||||
public class InputMessageDeserializationSchema implements DeserializationSchema<InputMessage> {
|
||||
|
||||
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
|
||||
|
||||
@Override
|
||||
public InputMessage deserialize(byte[] bytes) throws IOException {
|
||||
|
|
@ -27,11 +27,11 @@ public class KafkaTopicApplication {
|
|||
short replicationFactor = 1;
|
||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
||||
|
||||
CreateTopicsResult result = admin.createTopics(
|
||||
Collections.singleton(newTopic));
|
||||
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
|
||||
|
||||
// get the async result for the new topic creation
|
||||
KafkaFuture<Void> future = result.values().get(topicName);
|
||||
KafkaFuture<Void> future = result.values()
|
||||
.get(topicName);
|
||||
|
||||
// call get() to block until topic creation has completed or failed
|
||||
future.get();
|
||||
|
@ -47,15 +47,13 @@ public class KafkaTopicApplication {
|
|||
short replicationFactor = 1;
|
||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
||||
|
||||
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
|
||||
.validateOnly(true)
|
||||
.retryOnQuotaViolation(true);
|
||||
CreateTopicsOptions topicOptions = new CreateTopicsOptions().validateOnly(true)
|
||||
.retryOnQuotaViolation(true);
|
||||
|
||||
CreateTopicsResult result = admin.createTopics(
|
||||
Collections.singleton(newTopic), topicOptions
|
||||
);
|
||||
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic), topicOptions);
|
||||
|
||||
KafkaFuture<Void> future = result.values().get(topicName);
|
||||
KafkaFuture<Void> future = result.values()
|
||||
.get(topicName);
|
||||
future.get();
|
||||
}
|
||||
}
|
||||
|
@ -72,14 +70,12 @@ public class KafkaTopicApplication {
|
|||
Map<String, String> newTopicConfig = new HashMap<>();
|
||||
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
|
||||
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
|
||||
.configs(newTopicConfig);
|
||||
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor).configs(newTopicConfig);
|
||||
|
||||
CreateTopicsResult result = admin.createTopics(
|
||||
Collections.singleton(newTopic)
|
||||
);
|
||||
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
|
||||
|
||||
KafkaFuture<Void> future = result.values().get(topicName);
|
||||
KafkaFuture<Void> future = result.values()
|
||||
.get(topicName);
|
||||
future.get();
|
||||
}
|
||||
}
|
|
@ -19,9 +19,7 @@ public class CountryPopulationConsumer {
|
|||
private java.util.function.Consumer<Throwable> exceptionConsumer;
|
||||
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
|
||||
|
||||
public CountryPopulationConsumer(
|
||||
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
|
||||
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
|
||||
public CountryPopulationConsumer(Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer, java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
|
||||
this.consumer = consumer;
|
||||
this.exceptionConsumer = exceptionConsumer;
|
||||
this.countryPopulationConsumer = countryPopulationConsumer;
|
|
@ -1,4 +1,4 @@
|
|||
package com.baeldung.kafka;
|
||||
package com.baeldung.kafka.exactlyonce;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
|
@ -24,16 +24,16 @@ public class TransactionalMessageProducer {
|
|||
|
||||
producer.initTransactions();
|
||||
|
||||
try{
|
||||
try {
|
||||
|
||||
producer.beginTransaction();
|
||||
|
||||
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
|
||||
new ProducerRecord<String, String>("input", null, s)));
|
||||
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2)
|
||||
.forEach(s -> producer.send(new ProducerRecord<String, String>("input", null, s)));
|
||||
|
||||
producer.commitTransaction();
|
||||
|
||||
}catch (KafkaException e){
|
||||
} catch (KafkaException e) {
|
||||
|
||||
producer.abortTransaction();
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.baeldung.kafka;
|
||||
package com.baeldung.kafka.exactlyonce;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
|
@ -43,10 +43,11 @@ public class TransactionalWordCount {
|
|||
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
|
||||
|
||||
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
|
||||
.stream()
|
||||
.flatMap(record -> Stream.of(record.value().split(" ")))
|
||||
.map(word -> Tuple.of(word, 1))
|
||||
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
|
||||
.stream()
|
||||
.flatMap(record -> Stream.of(record.value()
|
||||
.split(" ")))
|
||||
.map(word -> Tuple.of(word, 1))
|
||||
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
|
||||
|
||||
producer.beginTransaction();
|
||||
|
||||
|
@ -56,7 +57,8 @@ public class TransactionalWordCount {
|
|||
|
||||
for (TopicPartition partition : records.partitions()) {
|
||||
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
|
||||
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
|
||||
long offset = partitionedRecords.get(partitionedRecords.size() - 1)
|
||||
.offset();
|
||||
|
||||
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
|
||||
}
|
||||
|
@ -72,7 +74,6 @@ public class TransactionalWordCount {
|
|||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static KafkaConsumer<String, String> createKafkaConsumer() {
|
|
@ -1,4 +1,4 @@
|
|||
package com.baeldung.kafka;
|
||||
package com.baeldung.kafka.exactlyonce;
|
||||
|
||||
public class Tuple {
|
||||
|
||||
|
@ -10,8 +10,8 @@ public class Tuple {
|
|||
this.value = value;
|
||||
}
|
||||
|
||||
public static Tuple of(String key, Integer value){
|
||||
return new Tuple(key,value);
|
||||
public static Tuple of(String key, Integer value) {
|
||||
return new Tuple(key, value);
|
||||
}
|
||||
|
||||
public String getKey() {
|
|
@ -15,8 +15,7 @@ public class KafkaProducer {
|
|||
}
|
||||
|
||||
public Future<RecordMetadata> send(String key, String value) {
|
||||
ProducerRecord record = new ProducerRecord("topic_sports_news",
|
||||
key, value);
|
||||
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
|
||||
return producer.send(record);
|
||||
}
|
||||
|
||||
|
@ -36,5 +35,4 @@ public class KafkaProducer {
|
|||
producer.commitTransaction();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.baeldung.kafka.streams;
|
||||
package com.baeldung.kafka.streamsvsconsumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
@ -1,61 +1,78 @@
|
|||
package com.baeldung.kafkastreams;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KStreamBuilder;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class KafkaStreamsLiveTest {
|
||||
private String bootstrapServers = "localhost:9092";
|
||||
private Path stateDirectory;
|
||||
|
||||
@Test
|
||||
@Ignore("it needs to have kafka broker running on local")
|
||||
public void shouldTestKafkaStreams() throws InterruptedException {
|
||||
//given
|
||||
// given
|
||||
String inputTopic = "inputTopic";
|
||||
|
||||
Properties streamsConfiguration = new Properties();
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()
|
||||
.getClass()
|
||||
.getName());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String()
|
||||
.getClass()
|
||||
.getName());
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
||||
// Use a temporary directory for storing state, which will be automatically removed after the test.
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
|
||||
try {
|
||||
this.stateDirectory = Files.createTempDirectory("kafka-streams");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath()
|
||||
.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new UncheckedIOException("Cannot create temporary directory", e);
|
||||
}
|
||||
|
||||
//when
|
||||
KStreamBuilder builder = new KStreamBuilder();
|
||||
// when
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
KStream<String, String> textLines = builder.stream(inputTopic);
|
||||
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
|
||||
|
||||
KTable<String, Long> wordCounts = textLines
|
||||
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
|
||||
.groupBy((key, word) -> word)
|
||||
.count();
|
||||
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
|
||||
.groupBy((key, word) -> word)
|
||||
.count();
|
||||
|
||||
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
|
||||
wordCounts.toStream()
|
||||
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
|
||||
|
||||
String outputTopic = "outputTopic";
|
||||
final Serde<String> stringSerde = Serdes.String();
|
||||
final Serde<Long> longSerde = Serdes.Long();
|
||||
wordCounts.to(stringSerde, longSerde, outputTopic);
|
||||
|
||||
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
|
||||
wordCounts.toStream()
|
||||
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
|
||||
|
||||
final Topology topology = builder.build();
|
||||
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
|
||||
streams.start();
|
||||
|
||||
//then
|
||||
// then
|
||||
Thread.sleep(30000);
|
||||
streams.close();
|
||||
}
|
|
@ -183,7 +183,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<!-- meecrowave -->
|
||||
<plugin>
|
||||
<groupId>org.apache.meecrowave</groupId>
|
||||
|
@ -216,4 +215,4 @@
|
|||
<solr.solr-solrj.version>6.4.0</solr.solr-solrj.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
File diff suppressed because it is too large
Load Diff
|
@ -25,4 +25,5 @@
|
|||
<geode.core>1.6.0</geode.core>
|
||||
<rocketmq.version>2.0.4</rocketmq.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -91,7 +91,7 @@
|
|||
</plugins>
|
||||
</build>
|
||||
|
||||
<repositories>
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>SparkPackagesRepo</id>
|
||||
<url>https://repos.spark-packages.org</url>
|
||||
|
|
|
@ -77,7 +77,6 @@
|
|||
<optimize>true</optimize>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
|
@ -88,7 +87,6 @@
|
|||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<!-- Run the application using "mvn jetty:run" -->
|
||||
<plugin>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
|
@ -119,7 +117,6 @@
|
|||
<id>jboss</id>
|
||||
<url>http://repository.jboss.org/nexus/content/groups/public/</url>
|
||||
</repository>
|
||||
|
||||
<!-- This repository is only needed when the Tapestry version is a preview release, rather than
|
||||
a final release. -->
|
||||
<repository>
|
||||
|
|
|
@ -68,4 +68,4 @@
|
|||
<asciidoctorj-pdf.plugin.version>1.5.0-alpha.15</asciidoctorj-pdf.plugin.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -31,4 +31,4 @@
|
|||
<atomix-all.version>1.0.0-rc9</atomix-all.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -20,4 +20,4 @@
|
|||
<module>todo-reminder/ToDoFunction</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -1,4 +1,5 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>helloworld</groupId>
|
||||
|
@ -6,10 +7,6 @@
|
|||
<version>1.0</version>
|
||||
<packaging>jar</packaging>
|
||||
<name>ToDoFunction</name>
|
||||
<properties>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
@ -18,9 +15,9 @@
|
|||
<version>1.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-lambda-java-events</artifactId>
|
||||
<version>3.6.0</version>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-lambda-java-events</artifactId>
|
||||
<version>3.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.org.webcompere</groupId>
|
||||
|
@ -58,10 +55,10 @@
|
|||
<version>5.0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13.1</version>
|
||||
<scope>test</scope>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.org.webcompere</groupId>
|
||||
|
@ -84,22 +81,28 @@
|
|||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<configuration>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<configuration>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -93,4 +93,5 @@
|
|||
<spring.version>2.2.1.RELEASE</spring.version>
|
||||
<awssdk.version>2.10.27</awssdk.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -83,7 +83,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>${assembly.plugin.version}</version>
|
||||
|
|
|
@ -48,13 +48,11 @@
|
|||
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>${maven-shade-plugin.version}</version>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -20,4 +19,4 @@
|
|||
<module>cf-uaa-oauth2-resource-server</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -173,4 +173,4 @@
|
|||
<groovy.compiler.version>3.3.0-01</groovy.compiler.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -2,3 +2,4 @@
|
|||
|
||||
- [String API Updates in Java 12](https://www.baeldung.com/java12-string-api)
|
||||
- [New Features in Java 12](https://www.baeldung.com/java-12-new-features)
|
||||
- [Compare the Content of Two Files in Java](https://www.baeldung.com/java-compare-files)
|
||||
|
|
|
@ -1,49 +1,60 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>core-java-12</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-12</name>
|
||||
<packaging>jar</packaging>
|
||||
<url>http://maven.apache.org</url>
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>core-java-12</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-12</name>
|
||||
<packaging>jar</packaging>
|
||||
<url>http://maven.apache.org</url>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<relativePath>../../</relativePath>
|
||||
</parent>
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<relativePath>../../</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>2.11.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<source>${maven.compiler.source.version}</source>
|
||||
<target>${maven.compiler.target.version}</target>
|
||||
<compilerArgs>--enable-preview</compilerArgs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<source>${maven.compiler.source.version}</source>
|
||||
<target>${maven.compiler.target.version}</target>
|
||||
<compilerArgs>--enable-preview</compilerArgs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<argLine>--enable-preview</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source.version>12</maven.compiler.source.version>
|
||||
<maven.compiler.target.version>12</maven.compiler.target.version>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
</properties>
|
||||
<properties>
|
||||
<maven.compiler.source.version>12</maven.compiler.source.version>
|
||||
<maven.compiler.target.version>12</maven.compiler.target.version>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,88 @@
|
|||
package com.baeldung.file.content.comparison;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class CompareFileContents {
|
||||
|
||||
public static long filesCompareByByte(Path path1, Path path2) throws IOException {
|
||||
|
||||
if (path1.getFileSystem()
|
||||
.provider()
|
||||
.isSameFile(path1, path2)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
try (BufferedInputStream fis1 = new BufferedInputStream(new FileInputStream(path1.toFile()));
|
||||
BufferedInputStream fis2 = new BufferedInputStream(new FileInputStream(path2.toFile()))) {
|
||||
int ch = 0;
|
||||
long pos = 1;
|
||||
while ((ch = fis1.read()) != -1) {
|
||||
if (ch != fis2.read()) {
|
||||
return pos;
|
||||
}
|
||||
pos++;
|
||||
}
|
||||
if (fis2.read() == -1) {
|
||||
return -1;
|
||||
} else {
|
||||
return pos;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static long filesCompareByLine(Path path1, Path path2) throws IOException {
|
||||
|
||||
if (path1.getFileSystem()
|
||||
.provider()
|
||||
.isSameFile(path1, path2)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
try (BufferedReader bf1 = Files.newBufferedReader(path1);
|
||||
BufferedReader bf2 = Files.newBufferedReader(path2)) {
|
||||
|
||||
long lineNumber = 1;
|
||||
String line1 = "", line2 = "";
|
||||
while ((line1 = bf1.readLine()) != null) {
|
||||
line2 = bf2.readLine();
|
||||
if (line2 == null || !line1.equals(line2)) {
|
||||
return lineNumber;
|
||||
}
|
||||
lineNumber++;
|
||||
}
|
||||
if (bf2.readLine() == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return lineNumber;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean compareByMemoryMappedFiles(Path path1, Path path2) throws IOException {
|
||||
|
||||
try (RandomAccessFile randomAccessFile1 = new RandomAccessFile(path1.toFile(), "r");
|
||||
RandomAccessFile randomAccessFile2 = new RandomAccessFile(path2.toFile(), "r")) {
|
||||
|
||||
FileChannel ch1 = randomAccessFile1.getChannel();
|
||||
FileChannel ch2 = randomAccessFile2.getChannel();
|
||||
if (ch1.size() != ch2.size()) {
|
||||
|
||||
return false;
|
||||
}
|
||||
long size = ch1.size();
|
||||
MappedByteBuffer m1 = ch1.map(FileChannel.MapMode.READ_ONLY, 0L, size);
|
||||
MappedByteBuffer m2 = ch2.map(FileChannel.MapMode.READ_ONLY, 0L, size);
|
||||
|
||||
return m1.equals(m2);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package com.baeldung.file.content.comparison;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.Reader;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class CampareFileContentsApacheIOUnitTest {
|
||||
|
||||
public static Path path1 = null;
|
||||
public static Path path2 = null;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() throws IOException {
|
||||
|
||||
path1 = Files.createTempFile("file1Test", ".txt");
|
||||
path2 = Files.createTempFile("file2Test", ".txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesIdentical_thenReturnTrue() throws IOException {
|
||||
|
||||
InputStream inputStream1 = new FileInputStream(path1.toFile());
|
||||
InputStream inputStream2 = new FileInputStream(path2.toFile());
|
||||
|
||||
Files.writeString(path1, "testing line 1" + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertTrue(IOUtils.contentEquals(inputStream1, inputStream2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesDifferent_thenReturnFalse() throws IOException {
|
||||
|
||||
InputStream inputStream1 = new FileInputStream(path1.toFile());
|
||||
InputStream inputStream2 = new FileInputStream(path2.toFile());
|
||||
|
||||
Files.writeString(path1, "testing line " + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertFalse(IOUtils.contentEquals(inputStream1, inputStream2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesIdenticalIgnoreEOF_thenReturnTrue() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing line 1 \n line 2");
|
||||
Files.writeString(path2, "testing line 1 \r\n line 2");
|
||||
|
||||
Reader reader1 = new BufferedReader(new FileReader(path1.toFile()));
|
||||
Reader reader2 = new BufferedReader(new FileReader(path2.toFile()));
|
||||
|
||||
assertTrue(IOUtils.contentEqualsIgnoreEOL(reader1, reader2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesNotIdenticalIgnoreEOF_thenReturnFalse() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing line \n line 2");
|
||||
Files.writeString(path2, "testing line 1 \r\n line 2");
|
||||
|
||||
Reader reader1 = new BufferedReader(new FileReader(path1.toFile()));
|
||||
Reader reader2 = new BufferedReader(new FileReader(path2.toFile()));
|
||||
|
||||
assertFalse(IOUtils.contentEqualsIgnoreEOL(reader1, reader2));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void shutDown() {
|
||||
|
||||
path1.toFile()
|
||||
.deleteOnExit();
|
||||
path2.toFile()
|
||||
.deleteOnExit();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package com.baeldung.file.content.comparison;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class CompareByMemoryMappedFilesUnitTest {
|
||||
|
||||
public static Path path1 = null;
|
||||
public static Path path2 = null;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() throws IOException {
|
||||
|
||||
path1 = Files.createTempFile("file1Test", ".txt");
|
||||
path2 = Files.createTempFile("file2Test", ".txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesIdentical_thenReturnTrue() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing line 1" + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertTrue(CompareFileContents.compareByMemoryMappedFiles(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesDifferent_thenReturnFalse() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing line " + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertFalse(CompareFileContents.compareByMemoryMappedFiles(path1, path2));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package com.baeldung.file.content.comparison;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class CompareFileContentsByBytesUnitTest {
|
||||
|
||||
public static Path path1 = null;
|
||||
public static Path path2 = null;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() throws IOException {
|
||||
|
||||
path1 = Files.createTempFile("file1Test", ".txt");
|
||||
path2 = Files.createTempFile("file2Test", ".txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFirstFileShorter_thenPositionInSecondFile() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing");
|
||||
Files.writeString(path2, "testing1");
|
||||
|
||||
assertEquals(8, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSecondFileShorter_thenPositionInFirstFile() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing1");
|
||||
Files.writeString(path2, "testing");
|
||||
|
||||
assertEquals(8, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesIdentical_thenSuccess() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing");
|
||||
Files.writeString(path2, "testing");
|
||||
|
||||
assertEquals(-1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesDifferent_thenPosition() throws IOException {
|
||||
|
||||
Files.writeString(path1, "tesXing");
|
||||
Files.writeString(path2, "testing");
|
||||
|
||||
assertEquals(4, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenBothFilesEmpty_thenEqual() throws IOException {
|
||||
|
||||
Files.writeString(path1, "");
|
||||
Files.writeString(path2, "");
|
||||
|
||||
assertEquals(-1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFirstEmpty_thenPositionFirst() throws IOException {
|
||||
|
||||
Files.writeString(path1, "");
|
||||
Files.writeString(path2, "test");
|
||||
|
||||
assertEquals(1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSecondEmpty_thenPositionFirst() throws IOException {
|
||||
|
||||
Files.writeString(path1, "test");
|
||||
Files.writeString(path2, "");
|
||||
|
||||
assertEquals(1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void shutDown() {
|
||||
|
||||
path1.toFile().deleteOnExit();
|
||||
path2.toFile().deleteOnExit();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package com.baeldung.file.content.comparison;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class CompareFileContentsByLinesUnitTest {
|
||||
|
||||
public static Path path1 = null;
|
||||
public static Path path2 = null;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() throws IOException {
|
||||
|
||||
path1 = Files.createTempFile("file1Test", ".txt");
|
||||
path2 = Files.createTempFile("file2Test", ".txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFirstFileShorter_thenLineNumbersFirstFile() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing line 1");
|
||||
Files.writeString(path2, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertEquals(1, CompareFileContents.filesCompareByLine(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSecondFileShorter_thenLineNumbersSecondFile() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing line 1");
|
||||
|
||||
assertEquals(1, CompareFileContents.filesCompareByLine(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFileIdentical_thenLineSuccess() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertEquals(-1, CompareFileContents.filesCompareByLine(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFilesDifferent_thenLineNumber() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "testing1 line 1" + System.lineSeparator() + "linX 2");
|
||||
|
||||
assertEquals(2, CompareFileContents.filesCompareByLine(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenBothFilesEmpty_thenEqual() throws IOException {
|
||||
|
||||
Files.writeString(path1, "");
|
||||
Files.writeString(path2, "");
|
||||
|
||||
assertEquals(-1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFirstEmpty_thenPositionFirst() throws IOException {
|
||||
|
||||
Files.writeString(path1, "");
|
||||
Files.writeString(path2, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
|
||||
assertEquals(1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSecondEmpty_thenPositionFirst() throws IOException {
|
||||
|
||||
Files.writeString(path1, "testing1 line 1" + System.lineSeparator() + "line 2");
|
||||
Files.writeString(path2, "");
|
||||
|
||||
assertEquals(1, CompareFileContents.filesCompareByByte(path1, path2));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void shutDown() {
|
||||
|
||||
path1.toFile().deleteOnExit();
|
||||
path2.toFile().deleteOnExit();
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
package java.com.baeldung.newfeatures;
|
||||
package com.baeldung.newfeatures;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.text.NumberFormat;
|
||||
import java.util.Locale;
|
||||
|
@ -16,6 +16,6 @@ public class CompactNumbersUnitTest {
|
|||
assertEquals("2.59K", likesShort.format(2592));
|
||||
NumberFormat likesLong = NumberFormat.getCompactNumberInstance(new Locale("en", "US"), NumberFormat.Style.LONG);
|
||||
likesLong.setMaximumFractionDigits(2);
|
||||
assertEquals("2.59 thousand", likesShort.format(2592));
|
||||
assertEquals("2.59 thousand", likesLong.format(2592));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package java.com.baeldung.newfeatures;
|
||||
package com.baeldung.newfeatures;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package java.com.baeldung.newfeatures;
|
||||
package com.baeldung.newfeatures;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package java.com.baeldung.newfeatures;
|
||||
package com.baeldung.newfeatures;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -13,6 +13,6 @@ public class TeeingCollectorUnitTest {
|
|||
public void givenSetOfNumbers_thenCalculateAverage() {
|
||||
double mean = Stream.of(1, 2, 3, 4, 5)
|
||||
.collect(Collectors.teeing(Collectors.summingDouble(i -> i), Collectors.counting(), (sum, count) -> sum / count));
|
||||
assertEquals(3.0, mean);
|
||||
assertEquals(3.0, mean, 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,48 +1,53 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>core-java-16</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-16</name>
|
||||
<packaging>jar</packaging>
|
||||
<url>http://maven.apache.org</url>
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>core-java-16</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-16</name>
|
||||
<packaging>jar</packaging>
|
||||
<url>http://maven.apache.org</url>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<relativePath>../../</relativePath>
|
||||
</parent>
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<relativePath>../../</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.12.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<source>${maven.compiler.source.version}</source>
|
||||
<target>${maven.compiler.target.version}</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<source>${maven.compiler.source.version}</source>
|
||||
<target>${maven.compiler.target.version}</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source.version>16</maven.compiler.source.version>
|
||||
<maven.compiler.target.version>16</maven.compiler.target.version>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
</properties>
|
||||
<properties>
|
||||
<maven.compiler.source.version>16</maven.compiler.source.version>
|
||||
<maven.compiler.target.version>16</maven.compiler.target.version>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,43 @@
|
|||
package com.baeldung.java_16_features.groupingby;
|
||||
|
||||
import java.util.IntSummaryStatistics;
|
||||
|
||||
|
||||
public class BlogPost {
|
||||
|
||||
private String title;
|
||||
private String author;
|
||||
private BlogPostType type;
|
||||
private int likes;
|
||||
record AuthPostTypesLikes(String author, BlogPostType type, int likes) {};
|
||||
record PostcountTitlesLikesStats(long postCount, String titles, IntSummaryStatistics likesStats){};
|
||||
record TitlesBoundedSumOfLikes(String titles, int boundedSumOfLikes) {};
|
||||
|
||||
public BlogPost(String title, String author, BlogPostType type, int likes) {
|
||||
this.title = title;
|
||||
this.author = author;
|
||||
this.type = type;
|
||||
this.likes = likes;
|
||||
}
|
||||
|
||||
public String getTitle() {
|
||||
return title;
|
||||
}
|
||||
|
||||
public String getAuthor() {
|
||||
return author;
|
||||
}
|
||||
|
||||
public BlogPostType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public int getLikes() {
|
||||
return likes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BlogPost{" + "title='" + title + '\'' + ", type=" + type + ", likes=" + likes + '}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package com.baeldung.java_16_features.groupingby;
|
||||
|
||||
public enum BlogPostType {
|
||||
NEWS, REVIEW, GUIDE
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package com.baeldung.java_16_features.groupingby;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class Tuple {
|
||||
private final BlogPostType type;
|
||||
private final String author;
|
||||
|
||||
public Tuple(BlogPostType type, String author) {
|
||||
this.type = type;
|
||||
this.author = author;
|
||||
}
|
||||
|
||||
public BlogPostType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getAuthor() {
|
||||
return author;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
Tuple tuple = (Tuple) o;
|
||||
return type == tuple.type && author.equals(tuple.author);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(type, author);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Tuple{" + "type=" + type + ", author='" + author + '\'' + '}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,302 @@
|
|||
package com.baeldung.java_16_features.groupingby;
|
||||
|
||||
import static java.util.Comparator.comparingInt;
|
||||
import static java.util.stream.Collectors.averagingInt;
|
||||
import static java.util.stream.Collectors.counting;
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.groupingByConcurrent;
|
||||
import static java.util.stream.Collectors.collectingAndThen;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static java.util.stream.Collectors.joining;
|
||||
import static java.util.stream.Collectors.mapping;
|
||||
import static java.util.stream.Collectors.maxBy;
|
||||
import static java.util.stream.Collectors.summarizingInt;
|
||||
import static java.util.stream.Collectors.summingInt;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.offset;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumMap;
|
||||
import java.util.IntSummaryStatistics;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class JavaGroupingByCollectorUnitTest {
|
||||
|
||||
private static final List<BlogPost> posts = Arrays.asList(new BlogPost("News item 1", "Author 1", BlogPostType.NEWS, 15), new BlogPost("Tech review 1", "Author 2", BlogPostType.REVIEW, 5),
|
||||
new BlogPost("Programming guide", "Author 1", BlogPostType.GUIDE, 20), new BlogPost("News item 2", "Author 2", BlogPostType.NEWS, 35), new BlogPost("Tech review 2", "Author 1", BlogPostType.REVIEW, 15));
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByType_thenGetAMapBetweenTypeAndPosts() {
|
||||
Map<BlogPostType, List<BlogPost>> postsPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType));
|
||||
|
||||
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
|
||||
.size());
|
||||
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
|
||||
.size());
|
||||
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeAndTheirTitlesAreJoinedInAString_thenGetAMapBetweenTypeAndCsvTitles() {
|
||||
Map<BlogPostType, String> postsPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, mapping(BlogPost::getTitle, joining(", ", "Post titles: [", "]"))));
|
||||
|
||||
assertEquals("Post titles: [News item 1, News item 2]", postsPerType.get(BlogPostType.NEWS));
|
||||
assertEquals("Post titles: [Programming guide]", postsPerType.get(BlogPostType.GUIDE));
|
||||
assertEquals("Post titles: [Tech review 1, Tech review 2]", postsPerType.get(BlogPostType.REVIEW));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeAndSumTheLikes_thenGetAMapBetweenTypeAndPostLikes() {
|
||||
Map<BlogPostType, Integer> likesPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, summingInt(BlogPost::getLikes)));
|
||||
|
||||
assertEquals(50, likesPerType.get(BlogPostType.NEWS)
|
||||
.intValue());
|
||||
assertEquals(20, likesPerType.get(BlogPostType.REVIEW)
|
||||
.intValue());
|
||||
assertEquals(20, likesPerType.get(BlogPostType.GUIDE)
|
||||
.intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeInAnEnumMap_thenGetAnEnumMapBetweenTypeAndPosts() {
|
||||
EnumMap<BlogPostType, List<BlogPost>> postsPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, () -> new EnumMap<>(BlogPostType.class), toList()));
|
||||
|
||||
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
|
||||
.size());
|
||||
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
|
||||
.size());
|
||||
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeInSets_thenGetAMapBetweenTypesAndSetsOfPosts() {
|
||||
Map<BlogPostType, Set<BlogPost>> postsPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, toSet()));
|
||||
|
||||
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
|
||||
.size());
|
||||
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
|
||||
.size());
|
||||
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeConcurrently_thenGetAMapBetweenTypeAndPosts() {
|
||||
ConcurrentMap<BlogPostType, List<BlogPost>> postsPerType = posts.parallelStream()
|
||||
.collect(groupingByConcurrent(BlogPost::getType));
|
||||
|
||||
assertEquals(2, postsPerType.get(BlogPostType.NEWS)
|
||||
.size());
|
||||
assertEquals(1, postsPerType.get(BlogPostType.GUIDE)
|
||||
.size());
|
||||
assertEquals(2, postsPerType.get(BlogPostType.REVIEW)
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeAndAveragingLikes_thenGetAMapBetweenTypeAndAverageNumberOfLikes() {
|
||||
Map<BlogPostType, Double> averageLikesPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, averagingInt(BlogPost::getLikes)));
|
||||
|
||||
assertEquals(25, averageLikesPerType.get(BlogPostType.NEWS)
|
||||
.intValue());
|
||||
assertEquals(20, averageLikesPerType.get(BlogPostType.GUIDE)
|
||||
.intValue());
|
||||
assertEquals(10, averageLikesPerType.get(BlogPostType.REVIEW)
|
||||
.intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeAndCounted_thenGetAMapBetweenTypeAndNumberOfPosts() {
|
||||
Map<BlogPostType, Long> numberOfPostsPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, counting()));
|
||||
|
||||
assertEquals(2, numberOfPostsPerType.get(BlogPostType.NEWS)
|
||||
.intValue());
|
||||
assertEquals(1, numberOfPostsPerType.get(BlogPostType.GUIDE)
|
||||
.intValue());
|
||||
assertEquals(2, numberOfPostsPerType.get(BlogPostType.REVIEW)
|
||||
.intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeAndMaxingLikes_thenGetAMapBetweenTypeAndMaximumNumberOfLikes() {
|
||||
Map<BlogPostType, Optional<BlogPost>> maxLikesPerPostType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, maxBy(comparingInt(BlogPost::getLikes))));
|
||||
|
||||
assertTrue(maxLikesPerPostType.get(BlogPostType.NEWS)
|
||||
.isPresent());
|
||||
assertEquals(35, maxLikesPerPostType.get(BlogPostType.NEWS)
|
||||
.get()
|
||||
.getLikes());
|
||||
|
||||
assertTrue(maxLikesPerPostType.get(BlogPostType.GUIDE)
|
||||
.isPresent());
|
||||
assertEquals(20, maxLikesPerPostType.get(BlogPostType.GUIDE)
|
||||
.get()
|
||||
.getLikes());
|
||||
|
||||
assertTrue(maxLikesPerPostType.get(BlogPostType.REVIEW)
|
||||
.isPresent());
|
||||
assertEquals(15, maxLikesPerPostType.get(BlogPostType.REVIEW)
|
||||
.get()
|
||||
.getLikes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByAuthorAndThenByType_thenGetAMapBetweenAuthorAndMapsBetweenTypeAndBlogPosts() {
|
||||
Map<String, Map<BlogPostType, List<BlogPost>>> map = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getAuthor, groupingBy(BlogPost::getType)));
|
||||
|
||||
assertEquals(1, map.get("Author 1")
|
||||
.get(BlogPostType.NEWS)
|
||||
.size());
|
||||
assertEquals(1, map.get("Author 1")
|
||||
.get(BlogPostType.GUIDE)
|
||||
.size());
|
||||
assertEquals(1, map.get("Author 1")
|
||||
.get(BlogPostType.REVIEW)
|
||||
.size());
|
||||
|
||||
assertEquals(1, map.get("Author 2")
|
||||
.get(BlogPostType.NEWS)
|
||||
.size());
|
||||
assertEquals(1, map.get("Author 2")
|
||||
.get(BlogPostType.REVIEW)
|
||||
.size());
|
||||
assertNull(map.get("Author 2")
|
||||
.get(BlogPostType.GUIDE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByTypeAndSummarizingLikes_thenGetAMapBetweenTypeAndSummary() {
|
||||
Map<BlogPostType, IntSummaryStatistics> likeStatisticsPerType = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getType, summarizingInt(BlogPost::getLikes)));
|
||||
|
||||
IntSummaryStatistics newsLikeStatistics = likeStatisticsPerType.get(BlogPostType.NEWS);
|
||||
|
||||
assertEquals(2, newsLikeStatistics.getCount());
|
||||
assertEquals(50, newsLikeStatistics.getSum());
|
||||
assertEquals(25.0, newsLikeStatistics.getAverage(), 0.001);
|
||||
assertEquals(35, newsLikeStatistics.getMax());
|
||||
assertEquals(15, newsLikeStatistics.getMin());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByComplexMapPairKeyType_thenGetAMapBetweenPairAndList() {
|
||||
|
||||
Map<Pair<BlogPostType, String>, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
|
||||
.collect(groupingBy(post -> new ImmutablePair<>(post.getType(), post.getAuthor())));
|
||||
|
||||
List<BlogPost> result = postsPerTypeAndAuthor.get(new ImmutablePair<>(BlogPostType.GUIDE, "Author 1"));
|
||||
|
||||
assertThat(result.size()).isEqualTo(1);
|
||||
|
||||
BlogPost blogPost = result.get(0);
|
||||
|
||||
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
|
||||
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
|
||||
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByComplexMapKeyType_thenGetAMapBetweenTupleAndList() {
|
||||
|
||||
Map<Tuple, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
|
||||
.collect(groupingBy(post -> new Tuple(post.getType(), post.getAuthor())));
|
||||
|
||||
List<BlogPost> result = postsPerTypeAndAuthor.get(new Tuple(BlogPostType.GUIDE, "Author 1"));
|
||||
|
||||
assertThat(result.size()).isEqualTo(1);
|
||||
|
||||
BlogPost blogPost = result.get(0);
|
||||
|
||||
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
|
||||
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
|
||||
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAListOfPosts_whenGroupedByRecord_thenGetAMapBetweenRecordAndList() {
|
||||
|
||||
Map<BlogPost.AuthPostTypesLikes, List<BlogPost>> postsPerTypeAndAuthor = posts.stream()
|
||||
.collect(groupingBy(post -> new BlogPost.AuthPostTypesLikes(post.getAuthor(), post.getType(), post.getLikes())));
|
||||
|
||||
List<BlogPost> result = postsPerTypeAndAuthor.get(new BlogPost.AuthPostTypesLikes("Author 1", BlogPostType.GUIDE, 20));
|
||||
|
||||
assertThat(result.size()).isEqualTo(1);
|
||||
|
||||
BlogPost blogPost = result.get(0);
|
||||
|
||||
assertThat(blogPost.getTitle()).isEqualTo("Programming guide");
|
||||
assertThat(blogPost.getType()).isEqualTo(BlogPostType.GUIDE);
|
||||
assertThat(blogPost.getAuthor()).isEqualTo("Author 1");
|
||||
assertThat(blogPost.getLikes()).isEqualTo(20);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenListOfPosts_whenGroupedByAuthor_thenGetAMapUsingCollectingAndThen() {
|
||||
|
||||
Map<String, BlogPost.PostcountTitlesLikesStats> postsPerAuthor = posts.stream()
|
||||
.collect(groupingBy(BlogPost::getAuthor, collectingAndThen(toList(), list -> {
|
||||
long count = list.stream()
|
||||
.map(BlogPost::getTitle)
|
||||
.collect(counting());
|
||||
String titles = list.stream()
|
||||
.map(BlogPost::getTitle)
|
||||
.collect(joining(" : "));
|
||||
IntSummaryStatistics summary = list.stream()
|
||||
.collect(summarizingInt(BlogPost::getLikes));
|
||||
return new BlogPost.PostcountTitlesLikesStats(count, titles, summary);
|
||||
})));
|
||||
|
||||
BlogPost.PostcountTitlesLikesStats result = postsPerAuthor.get("Author 1");
|
||||
assertThat(result.postCount()).isEqualTo(3L);
|
||||
assertThat(result.titles()).isEqualTo("News item 1 : Programming guide : Tech review 2");
|
||||
assertThat(result.likesStats().getMax()).isEqualTo(20);
|
||||
assertThat(result.likesStats().getMin()).isEqualTo(15);
|
||||
assertThat(result.likesStats().getAverage()).isEqualTo(16.666d, offset(0.001d));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenListOfPosts_whenGroupedByAuthor_thenGetAMapUsingToMap() {
|
||||
|
||||
int maxValLikes = 17;
|
||||
Map<String, BlogPost.TitlesBoundedSumOfLikes> postsPerAuthor = posts.stream()
|
||||
.collect(toMap(BlogPost::getAuthor, post -> {
|
||||
int likes = (post.getLikes() > maxValLikes) ? maxValLikes : post.getLikes();
|
||||
return new BlogPost.TitlesBoundedSumOfLikes(post.getTitle(), likes);
|
||||
}, (u1, u2) -> {
|
||||
int likes = (u2.boundedSumOfLikes() > maxValLikes) ? maxValLikes : u2.boundedSumOfLikes();
|
||||
return new BlogPost.TitlesBoundedSumOfLikes(u1.titles()
|
||||
.toUpperCase() + " : "
|
||||
+ u2.titles()
|
||||
.toUpperCase(),
|
||||
u1.boundedSumOfLikes() + likes);
|
||||
}));
|
||||
|
||||
BlogPost.TitlesBoundedSumOfLikes result = postsPerAuthor.get("Author 1");
|
||||
assertThat(result.titles()).isEqualTo("NEWS ITEM 1 : PROGRAMMING GUIDE : TECH REVIEW 2");
|
||||
assertThat(result.boundedSumOfLikes()).isEqualTo(47);
|
||||
}
|
||||
}
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-2</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -31,7 +31,6 @@
|
|||
<artifactId>jmh-generator-annprocess</artifactId>
|
||||
<version>${jmh-generator.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-advanced</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-basic-2</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -189,21 +189,8 @@ public class StopExecution {
|
|||
longRunningSort();
|
||||
}
|
||||
|
||||
private void longRunningOperation() {
|
||||
LOG.info("long Running operation started");
|
||||
|
||||
try {
|
||||
//Thread.sleep(500);
|
||||
longFileRead();
|
||||
LOG.info("long running operation finished");
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("long Running operation interrupted");
|
||||
}
|
||||
}
|
||||
|
||||
private void longRunningSort() {
|
||||
LOG.info("long Running task started");
|
||||
// Do you long running calculation here
|
||||
LOG.info("Long running task started");
|
||||
int len = 100000;
|
||||
List<Integer> numbers = new ArrayList<>();
|
||||
try {
|
||||
|
@ -229,25 +216,7 @@ public class StopExecution {
|
|||
LOG.info("Index position: " + i);
|
||||
LOG.info("Long running task finished");
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("long Running operation interrupted");
|
||||
}
|
||||
}
|
||||
|
||||
private void longFileRead() throws InterruptedException {
|
||||
String file = "input.txt";
|
||||
ClassLoader classloader = getClass().getClassLoader();
|
||||
|
||||
try (InputStream inputStream = classloader.getResourceAsStream(file)) {
|
||||
Reader inputStreamReader = new InputStreamReader(inputStream);
|
||||
|
||||
int data = inputStreamReader.read();
|
||||
while (data != -1) {
|
||||
char theChar = (char) data;
|
||||
data = inputStreamReader.read();
|
||||
throwExceptionOnThreadInterrupt();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception: ", e);
|
||||
LOG.info("Long running operation interrupted");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-basic</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-collections-2</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-collections</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -49,7 +49,6 @@
|
|||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
|
|
|
@ -13,12 +13,9 @@ import java.util.stream.IntStream;
|
|||
public class TimeApi {
|
||||
|
||||
public static List<Date> getDatesBetweenUsingJava7(Date startDate, Date endDate) {
|
||||
List<Date> datesInRange = new ArrayList<Date>();
|
||||
Calendar calendar = new GregorianCalendar();
|
||||
calendar.setTime(startDate);
|
||||
|
||||
Calendar endCalendar = new GregorianCalendar();
|
||||
endCalendar.setTime(endDate);
|
||||
List<Date> datesInRange = new ArrayList<>();
|
||||
Calendar calendar = getCalendarWithoutTime(startDate);
|
||||
Calendar endCalendar = getCalendarWithoutTime(endDate);
|
||||
|
||||
while (calendar.before(endCalendar)) {
|
||||
Date result = calendar.getTime();
|
||||
|
@ -40,4 +37,15 @@ public class TimeApi {
|
|||
return startDate.datesUntil(endDate).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static Calendar getCalendarWithoutTime(Date date) {
|
||||
Calendar calendar = new GregorianCalendar();
|
||||
calendar.setTime(date);
|
||||
calendar.set(Calendar.HOUR, 0);
|
||||
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
||||
calendar.set(Calendar.MINUTE, 0);
|
||||
calendar.set(Calendar.SECOND, 0);
|
||||
calendar.set(Calendar.MILLISECOND, 0);
|
||||
return calendar;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
package com.baeldung.java9.time;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import org.junit.Test;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class TimeApiUnitTest {
|
||||
|
||||
|
@ -18,19 +19,18 @@ public class TimeApiUnitTest {
|
|||
Date endDate = endCalendar.getTime();
|
||||
|
||||
List<Date> dates = TimeApi.getDatesBetweenUsingJava7(startDate, endDate);
|
||||
assertEquals(dates.size(), 2);
|
||||
|
||||
assertThat(dates).hasSize(2);
|
||||
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
Date date1 = calendar.getTime();
|
||||
assertEquals(dates.get(0).getDay(), date1.getDay());
|
||||
assertEquals(dates.get(0).getMonth(), date1.getMonth());
|
||||
assertEquals(dates.get(0).getYear(), date1.getYear());
|
||||
Date expectedDate1 = calendar.getTime();
|
||||
assertThat(dates.get(0)).isInSameDayAs(expectedDate1);
|
||||
assertThatTimeFieldsAreZero(dates.get(0));
|
||||
|
||||
calendar.add(Calendar.DATE, 1);
|
||||
Date date2 = calendar.getTime();
|
||||
assertEquals(dates.get(1).getDay(), date2.getDay());
|
||||
assertEquals(dates.get(1).getMonth(), date2.getMonth());
|
||||
assertEquals(dates.get(1).getYear(), date2.getYear());
|
||||
Date expectedDate2 = calendar.getTime();
|
||||
assertThat(dates.get(1)).isInSameDayAs(expectedDate2);
|
||||
assertThatTimeFieldsAreZero(dates.get(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -39,9 +39,8 @@ public class TimeApiUnitTest {
|
|||
LocalDate endDate = LocalDate.now().plusDays(2);
|
||||
|
||||
List<LocalDate> dates = TimeApi.getDatesBetweenUsingJava8(startDate, endDate);
|
||||
assertEquals(dates.size(), 2);
|
||||
assertEquals(dates.get(0), LocalDate.now());
|
||||
assertEquals(dates.get(1), LocalDate.now().plusDays(1));
|
||||
|
||||
assertThat(dates).containsExactly(LocalDate.now(), LocalDate.now().plusDays(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -50,9 +49,15 @@ public class TimeApiUnitTest {
|
|||
LocalDate endDate = LocalDate.now().plusDays(2);
|
||||
|
||||
List<LocalDate> dates = TimeApi.getDatesBetweenUsingJava9(startDate, endDate);
|
||||
assertEquals(dates.size(), 2);
|
||||
assertEquals(dates.get(0), LocalDate.now());
|
||||
assertEquals(dates.get(1), LocalDate.now().plusDays(1));
|
||||
|
||||
assertThat(dates).containsExactly(LocalDate.now(), LocalDate.now().plusDays(1));
|
||||
}
|
||||
|
||||
private static void assertThatTimeFieldsAreZero(Date date) {
|
||||
assertThat(date).hasHourOfDay(0);
|
||||
assertThat(date).hasMinute(0);
|
||||
assertThat(date).hasSecond(0);
|
||||
assertThat(date).hasMillisecond(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>${project.parent.version}</version>
|
||||
<name>core-java-date-operations-2</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -48,7 +48,6 @@
|
|||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
|
|
|
@ -75,7 +75,7 @@
|
|||
|
||||
<properties>
|
||||
<commons-validator.version>1.6</commons-validator.version>
|
||||
<joda-time.version>2.10</joda-time.version>
|
||||
<joda-time.version>2.10.10</joda-time.version>
|
||||
<!-- testing -->
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
<maven.compiler.source>1.9</maven.compiler.source>
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
package com.baeldung.formatduration;
|
||||
|
||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class FormatDurationUnitTest {
|
||||
|
||||
|
||||
@Test
|
||||
public void givenInterval_WhenFormatInterval_formatDuration() {
|
||||
long HH = TimeUnit.MILLISECONDS.toHours(38114000);
|
||||
long MM = TimeUnit.MILLISECONDS.toMinutes(38114000) % 60;
|
||||
long SS = TimeUnit.MILLISECONDS.toSeconds(38114000) % 60;
|
||||
String timeInHHMMSS = String.format("%02d:%02d:%02d", HH, MM, SS);
|
||||
|
||||
assertThat(timeInHHMMSS).isEqualTo("10:35:14");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenInterval_WhenFormatUsingDuration_formatDuration() {
|
||||
Duration duration = Duration.ofMillis(38114000);
|
||||
long seconds = duration.getSeconds();
|
||||
long HH = seconds / 3600;
|
||||
long MM = (seconds % 3600) / 60;
|
||||
long SS = seconds % 60;
|
||||
String timeInHHMMSS = String.format("%02d:%02d:%02d", HH, MM, SS);
|
||||
assertThat(timeInHHMMSS).isEqualTo("10:35:14");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void givenInterval_WhenFormatDurationUsingApacheCommons_formatDuration() {
|
||||
assertThat(DurationFormatUtils.formatDuration(38114000, "HH:mm:ss"))
|
||||
.isEqualTo("10:35:14");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenInterval_WhenFormatDurationUsingJodaTime_formatDuration() {
|
||||
org.joda.time.Duration duration = new org.joda.time.Duration(38114000);
|
||||
Period period = duration.toPeriod();
|
||||
long HH = period.getHours();
|
||||
long MM = period.getMinutes();
|
||||
long SS = period.getSeconds();
|
||||
|
||||
String timeInHHMMSS = String.format("%02d:%02d:%02d", HH, MM, SS);
|
||||
assertThat(timeInHHMMSS).isEqualTo("10:35:14");
|
||||
}
|
||||
}
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-function</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
@ -40,4 +40,4 @@
|
|||
<assertj.version>3.6.1</assertj.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
</project>
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-functional</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-io-2</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-io-apis</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-io-conversions</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.core-java-modules</groupId>
|
||||
<artifactId>core-java-modules</artifactId>
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue