diff --git a/akka-streams/pom.xml b/akka-streams/pom.xml new file mode 100644 index 0000000000..b1471641f7 --- /dev/null +++ b/akka-streams/pom.xml @@ -0,0 +1,30 @@ + + + + parent-modules + com.baeldung + 1.0.0-SNAPSHOT + + 4.0.0 + akka-streams + akka-streams + + + + com.typesafe.akka + akka-stream_2.11 + ${akkastreams.version} + + + com.typesafe.akka + akka-stream-testkit_2.11 + ${akkastreams.version} + + + + 2.5.2 + + + \ No newline at end of file diff --git a/akka-streams/src/main/java/com/baeldung/akkastreams/AverageRepository.java b/akka-streams/src/main/java/com/baeldung/akkastreams/AverageRepository.java new file mode 100644 index 0000000000..8cfae6d52a --- /dev/null +++ b/akka-streams/src/main/java/com/baeldung/akkastreams/AverageRepository.java @@ -0,0 +1,14 @@ +package com.baeldung.akkastreams; + + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class AverageRepository { + CompletionStage save(Double average) { + return CompletableFuture.supplyAsync(() -> { + System.out.println("saving average: " + average); + return average; + }); + } +} diff --git a/akka-streams/src/main/java/com/baeldung/akkastreams/DataImporter.java b/akka-streams/src/main/java/com/baeldung/akkastreams/DataImporter.java new file mode 100644 index 0000000000..92701e4e37 --- /dev/null +++ b/akka-streams/src/main/java/com/baeldung/akkastreams/DataImporter.java @@ -0,0 +1,74 @@ +package com.baeldung.akkastreams; + + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; + +public class DataImporter { + private final ActorSystem actorSystem; + private final AverageRepository averageRepository = new AverageRepository(); + + public DataImporter(ActorSystem actorSystem) { + this.actorSystem = actorSystem; + + } + + private List parseLine(String line) { + String[] fields = line.split(";"); + return Arrays.stream(fields) + .map(Integer::parseInt) + .collect(Collectors.toList()); + } + + private Flow parseContent() { + return Flow.of(String.class).mapConcat(this::parseLine); + } + + private Flow computeAverage() { + return Flow.of(Integer.class).grouped(2).mapAsyncUnordered(8, integers -> + CompletableFuture.supplyAsync(() -> integers + .stream() + .mapToDouble(v -> v) + .average() + .orElse(-1.0))); + } + + Flow calculateAverage() { + return Flow.of(String.class) + .via(parseContent()) + .via(computeAverage()); + } + + private Sink> storeAverages() { + return Flow.of(Double.class) + .mapAsyncUnordered(4, averageRepository::save) + .toMat(Sink.ignore(), Keep.right()); + } + + + CompletionStage calculateAverageForContent(String content) { + return Source.single(content) + .via(calculateAverage()) + .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) + .whenComplete((d, e) -> { + if (d != null) { + System.out.println("Import finished "); + } else { + e.printStackTrace(); + } + }); + } + +} diff --git a/akka-streams/src/test/java/com/baeldung/akkastreams/DataImporterUnitTest.java b/akka-streams/src/test/java/com/baeldung/akkastreams/DataImporterUnitTest.java new file mode 100644 index 0000000000..c47d74eae5 --- /dev/null +++ b/akka-streams/src/test/java/com/baeldung/akkastreams/DataImporterUnitTest.java @@ -0,0 +1,43 @@ +package com.baeldung.akkastreams; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; +import akka.stream.testkit.javadsl.TestSink; +import org.junit.Test; + + +public class DataImporterUnitTest { + private final ActorSystem actorSystem = ActorSystem.create(); + + @Test + public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { + //given + Flow tested = new DataImporter(actorSystem).calculateAverage(); + String input = "1;9;11;0"; + + //when + Source flow = Source.single(input).via(tested); + + //then + flow + .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) + .request(4) + .expectNextUnordered(5d, 5.5); + } + + @Test + public void givenStreamOfIntegers_whenCalculateAverageAndSaveToSink_thenShouldFinishSuccessfully() { + //given + DataImporter dataImporter = new DataImporter(actorSystem); + String input = "10;90;110;10"; + + //when + dataImporter.calculateAverageForContent(input) + .thenAccept(d -> actorSystem.terminate()); + + } + +} \ No newline at end of file