BAEL-936 code for akka_streams article (#1939)
* BAEL-936 code for akka_streams article * rename
This commit is contained in:
parent
c17d19ff21
commit
71ddfeecb7
|
@ -0,0 +1,30 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>parent-modules</artifactId>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>akka-streams</artifactId>
|
||||||
|
<name>akka-streams</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.typesafe.akka</groupId>
|
||||||
|
<artifactId>akka-stream_2.11</artifactId>
|
||||||
|
<version>${akkastreams.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.typesafe.akka</groupId>
|
||||||
|
<artifactId>akka-stream-testkit_2.11</artifactId>
|
||||||
|
<version>${akkastreams.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
<properties>
|
||||||
|
<akkastreams.version>2.5.2</akkastreams.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.baeldung.akkastreams;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
|
||||||
|
public class AverageRepository {
|
||||||
|
CompletionStage<Double> save(Double average) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
|
System.out.println("saving average: " + average);
|
||||||
|
return average;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,74 @@
|
||||||
|
package com.baeldung.akkastreams;
|
||||||
|
|
||||||
|
|
||||||
|
import akka.Done;
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.stream.ActorMaterializer;
|
||||||
|
import akka.stream.javadsl.Flow;
|
||||||
|
import akka.stream.javadsl.Keep;
|
||||||
|
import akka.stream.javadsl.Sink;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class DataImporter {
|
||||||
|
private final ActorSystem actorSystem;
|
||||||
|
private final AverageRepository averageRepository = new AverageRepository();
|
||||||
|
|
||||||
|
public DataImporter(ActorSystem actorSystem) {
|
||||||
|
this.actorSystem = actorSystem;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Integer> parseLine(String line) {
|
||||||
|
String[] fields = line.split(";");
|
||||||
|
return Arrays.stream(fields)
|
||||||
|
.map(Integer::parseInt)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Flow<String, Integer, NotUsed> parseContent() {
|
||||||
|
return Flow.of(String.class).mapConcat(this::parseLine);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Flow<Integer, Double, NotUsed> computeAverage() {
|
||||||
|
return Flow.of(Integer.class).grouped(2).mapAsyncUnordered(8, integers ->
|
||||||
|
CompletableFuture.supplyAsync(() -> integers
|
||||||
|
.stream()
|
||||||
|
.mapToDouble(v -> v)
|
||||||
|
.average()
|
||||||
|
.orElse(-1.0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Flow<String, Double, NotUsed> calculateAverage() {
|
||||||
|
return Flow.of(String.class)
|
||||||
|
.via(parseContent())
|
||||||
|
.via(computeAverage());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Sink<Double, CompletionStage<Done>> storeAverages() {
|
||||||
|
return Flow.of(Double.class)
|
||||||
|
.mapAsyncUnordered(4, averageRepository::save)
|
||||||
|
.toMat(Sink.ignore(), Keep.right());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
CompletionStage<Done> calculateAverageForContent(String content) {
|
||||||
|
return Source.single(content)
|
||||||
|
.via(calculateAverage())
|
||||||
|
.runWith(storeAverages(), ActorMaterializer.create(actorSystem))
|
||||||
|
.whenComplete((d, e) -> {
|
||||||
|
if (d != null) {
|
||||||
|
System.out.println("Import finished ");
|
||||||
|
} else {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
package com.baeldung.akkastreams;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.stream.ActorMaterializer;
|
||||||
|
import akka.stream.javadsl.Flow;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
import akka.stream.testkit.javadsl.TestSink;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class DataImporterUnitTest {
|
||||||
|
private final ActorSystem actorSystem = ActorSystem.create();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
|
||||||
|
//given
|
||||||
|
Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
|
||||||
|
String input = "1;9;11;0";
|
||||||
|
|
||||||
|
//when
|
||||||
|
Source<Double, NotUsed> flow = Source.single(input).via(tested);
|
||||||
|
|
||||||
|
//then
|
||||||
|
flow
|
||||||
|
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
|
||||||
|
.request(4)
|
||||||
|
.expectNextUnordered(5d, 5.5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenStreamOfIntegers_whenCalculateAverageAndSaveToSink_thenShouldFinishSuccessfully() {
|
||||||
|
//given
|
||||||
|
DataImporter dataImporter = new DataImporter(actorSystem);
|
||||||
|
String input = "10;90;110;10";
|
||||||
|
|
||||||
|
//when
|
||||||
|
dataImporter.calculateAverageForContent(input)
|
||||||
|
.thenAccept(d -> actorSystem.terminate());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue