From 969cedfecd6801d2b97a507664802ef0862e6c8a Mon Sep 17 00:00:00 2001 From: Adrian Maghear Date: Sat, 3 Oct 2020 21:28:40 +0200 Subject: [PATCH 1/3] [BAEL-3600] setup initial mantis job --- netflix-modules/mantis/pom.xml | 64 +++++++++++++++++++ .../netflix/mantis/MantisApplication.java | 24 +++++++ .../netflix/mantis/job/LogAggregationJob.java | 30 +++++++++ .../netflix/mantis/job/LogCollectingJob.java | 31 +++++++++ .../netflix/mantis/model/LogAggregate.java | 27 ++++++++ .../netflix/mantis/model/LogEvent.java | 36 +++++++++++ .../baeldung/netflix/mantis/sink/LogSink.java | 37 +++++++++++ .../mantis/source/RandomLogSource.java | 46 +++++++++++++ .../netflix/mantis/stage/CountLogStage.java | 58 +++++++++++++++++ .../netflix/mantis/stage/GroupLogStage.java | 25 ++++++++ .../mantis/stage/TransformLogStage.java | 24 +++++++ netflix-modules/pom.xml | 1 + 12 files changed, 403 insertions(+) create mode 100644 netflix-modules/mantis/pom.xml create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java create mode 100644 netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java diff --git a/netflix-modules/mantis/pom.xml b/netflix-modules/mantis/pom.xml new file mode 100644 index 0000000000..48151c142f --- /dev/null +++ b/netflix-modules/mantis/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + mantis + Mantis + jar + Sample project for Netflix Mantis + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../../parent-boot-2 + + + + + + org.springframework.boot + spring-boot-starter + 2.1.3.RELEASE + + + + io.mantisrx + mantis-runtime + 1.2.63 + + + org.slf4j + slf4j-log4j12 + + + + + + com.fasterxml.jackson.core + jackson-databind + 2.10.2 + + + + net.andreinc.mockneat + mockneat + 0.3.8 + + + + org.projectlombok + lombok + 1.18.12 + + + + + + + SpringLibReleaseRepo + https://repo.spring.io/libs-release/ + + + + diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java new file mode 100644 index 0000000000..ad2b7f9aed --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java @@ -0,0 +1,24 @@ +package com.baeldung.netflix.mantis; + +import com.baeldung.netflix.mantis.job.LogAggregationJob; +import com.baeldung.netflix.mantis.job.LogCollectingJob; +import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Slf4j +@SpringBootApplication +public class MantisApplication implements CommandLineRunner { + + public static void main(String[] args) { + SpringApplication.run(MantisApplication.class, args); + } + + @Override + public void run(String... args) { + LocalJobExecutorNetworked.execute(new LogAggregationJob().getJobInstance()); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java new file mode 100644 index 0000000000..229d11d39d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java @@ -0,0 +1,30 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import com.baeldung.netflix.mantis.source.RandomLogSource; +import com.baeldung.netflix.mantis.stage.CountLogStage; +import com.baeldung.netflix.mantis.stage.GroupLogStage; +import com.baeldung.netflix.mantis.stage.TransformLogStage; +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJob; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.sink.Sinks; + +public class LogAggregationJob extends MantisJobProvider { + + @Override + public Job getJobInstance() { + + return MantisJob + .source(new RandomLogSource()) + .stage(new TransformLogStage(), TransformLogStage.stageConfig()) + .stage(new GroupLogStage(), GroupLogStage.config()) + .stage(new CountLogStage(), CountLogStage.config()) + .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) + .metadata(new Metadata.Builder().build()) + .create(); + + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java new file mode 100644 index 0000000000..9bb6ae7f6d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java @@ -0,0 +1,31 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogEvent; +import com.baeldung.netflix.mantis.sink.LogSink; +import com.baeldung.netflix.mantis.source.RandomLogSource; +import com.baeldung.netflix.mantis.stage.TransformLogStage; +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJob; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.ScalarToScalar; +import io.mantisrx.runtime.sink.Sinks; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LogCollectingJob extends MantisJobProvider { + + @Override + public Job getJobInstance() { + + return MantisJob + .source(new RandomLogSource()) + .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) +// .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) + .sink(new LogSink()) + .metadata(new Metadata.Builder().build()) + .create(); + + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java new file mode 100644 index 0000000000..a9f1818b9a --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java @@ -0,0 +1,27 @@ +package com.baeldung.netflix.mantis.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mantisrx.runtime.codec.JsonType; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class LogAggregate implements JsonType { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private final Integer count; + private final String level; + + public String toJsonString() { + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java new file mode 100644 index 0000000000..36665b0264 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java @@ -0,0 +1,36 @@ +package com.baeldung.netflix.mantis.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mantisrx.runtime.codec.JsonType; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class LogEvent implements JsonType { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private Long index; + private String level; + private String message; + + public LogEvent(String[] parts) { + this.index = Long.valueOf(parts[0]); + this.level = parts[1]; + this.message = parts[2]; + } + + public String toJsonString() { + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java new file mode 100644 index 0000000000..ae2177bf87 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java @@ -0,0 +1,37 @@ +package com.baeldung.netflix.mantis.sink; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.PortRequest; +import io.mantisrx.runtime.sink.SelfDocumentingSink; +import io.mantisrx.runtime.sink.ServerSentEventsSink; +import io.mantisrx.runtime.sink.Sink; +import io.mantisrx.runtime.sink.predicate.Predicate; +import rx.Observable; + +public class LogSink implements Sink { + + @Override + public void call(Context context, PortRequest portRequest, Observable logEventObservable) { + + SelfDocumentingSink sink = new ServerSentEventsSink.Builder() + .withEncoder(LogEvent::toJsonString) + .withPredicate(filterByLogMessage()) + .build(); + + logEventObservable.subscribe(); + + sink.call(context, portRequest, logEventObservable); + } + + private Predicate filterByLogMessage() { + return new Predicate<>("filter by message", + parameters -> { + if (parameters != null && parameters.containsKey("filter")) { + return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); + } + return logEvent -> true; + }); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java new file mode 100644 index 0000000000..fe607d9866 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java @@ -0,0 +1,46 @@ +package com.baeldung.netflix.mantis.source; + +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.source.Index; +import io.mantisrx.runtime.source.Source; +import lombok.extern.slf4j.Slf4j; +import net.andreinc.mockneat.MockNeat; +import rx.Observable; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class RandomLogSource implements Source { + + private MockNeat mockDataGenerator; + + @Override + public void init(Context context, Index index) { + mockDataGenerator = MockNeat.threadLocal(); + } + + @Override + public Observable> call(Context context, Index index) { + return Observable.just( + Observable + .interval(250, TimeUnit.MILLISECONDS) + .map(this::createRandomLogEvent)); + } + + private String createRandomLogEvent(Long tick) { + String level = mockDataGenerator.probabilites(String.class) + .add(0.5, "INFO") + .add(0.3, "WARN") + .add(0.2, "ERROR") + .get(); + + String message = mockDataGenerator.probabilites(String.class) + .add(0.5, "login attempt") + .add(0.5, "user created") + .get(); + + return tick + "#" + level + "#" + message; + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java new file mode 100644 index 0000000000..5f02d783d0 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java @@ -0,0 +1,58 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.common.MantisGroup; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.GroupToScalar; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.GroupToScalarComputation; +import io.mantisrx.runtime.parameter.ParameterDefinition; +import io.mantisrx.runtime.parameter.type.IntParameter; +import io.mantisrx.runtime.parameter.validator.Validators; +import rx.Observable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class CountLogStage implements GroupToScalarComputation { + + private int duration; + + @Override + public void init(Context context) { + duration = (int)context.getParameters().get("LogAggregationDuration", 1000); + } + + @Override + public Observable call(Context context, Observable> mantisGroup) { + return mantisGroup + .window(duration, TimeUnit.MILLISECONDS) + .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) + .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) + .map((count) -> new LogAggregate(count, group.getKey())) + )); + } + + public static GroupToScalar.Config config(){ + return new GroupToScalar.Config() + .description("sum events for a log level") + .codec(JacksonCodecs.pojo(LogAggregate.class)) + .withParameters(getParameters()); + } + + public static List> getParameters() { + List> params = new ArrayList<>(); + + params.add(new IntParameter() + .name("LogAggregationDuration") + .description("window size for aggregation in milliseconds") + .validator(Validators.range(100, 10000)) + .defaultValue(5000) + .build()) ; + + return params; + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java new file mode 100644 index 0000000000..f21c4aba7d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java @@ -0,0 +1,25 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.common.MantisGroup; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.ScalarToGroup; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.ToGroupComputation; +import rx.Observable; + +public class GroupLogStage implements ToGroupComputation { + + @Override + public Observable> call(Context context, Observable logEvent) { + return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log)); + } + + public static ScalarToGroup.Config config(){ + return new ScalarToGroup.Config() + .description("Group event data by level") + .codec(JacksonCodecs.pojo(LogEvent.class)) + .concurrentInput(); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java new file mode 100644 index 0000000000..33e6567d13 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java @@ -0,0 +1,24 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.ScalarToScalar; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.ScalarComputation; +import rx.Observable; + +public class TransformLogStage implements ScalarComputation { + + @Override + public Observable call(Context context, Observable logEntry) { + return logEntry + .map(log -> log.split("#")) + .filter(parts -> parts.length == 3) + .map(LogEvent::new); + } + + public static ScalarToScalar.Config stageConfig() { + return new ScalarToScalar.Config() + .codec(JacksonCodecs.pojo(LogEvent.class)); + } +} diff --git a/netflix-modules/pom.xml b/netflix-modules/pom.xml index 9ed22498d8..538126fb34 100644 --- a/netflix-modules/pom.xml +++ b/netflix-modules/pom.xml @@ -15,6 +15,7 @@ genie + mantis \ No newline at end of file From 795bcbe2b78213abcda781536e7d9a3a0b7d8ece Mon Sep 17 00:00:00 2001 From: Adrian Maghear Date: Sun, 4 Oct 2020 12:53:41 +0200 Subject: [PATCH 2/3] [BAEL-3600] small fixes --- .../java/com/baeldung/netflix/mantis/MantisApplication.java | 1 - .../java/com/baeldung/netflix/mantis/job/LogCollectingJob.java | 2 -- .../java/com/baeldung/netflix/mantis/model/LogAggregate.java | 1 - .../main/java/com/baeldung/netflix/mantis/model/LogEvent.java | 1 - 4 files changed, 5 deletions(-) diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java index ad2b7f9aed..d5ffe977c3 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java @@ -1,7 +1,6 @@ package com.baeldung.netflix.mantis; import com.baeldung.netflix.mantis.job.LogAggregationJob; -import com.baeldung.netflix.mantis.job.LogCollectingJob; import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java index 9bb6ae7f6d..492f30c43a 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java @@ -9,7 +9,6 @@ import io.mantisrx.runtime.MantisJob; import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.Metadata; import io.mantisrx.runtime.ScalarToScalar; -import io.mantisrx.runtime.sink.Sinks; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -21,7 +20,6 @@ public class LogCollectingJob extends MantisJobProvider { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) -// .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) .sink(new LogSink()) .metadata(new Metadata.Builder().build()) .create(); diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java index a9f1818b9a..0eeb7ea086 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java @@ -19,7 +19,6 @@ public class LogAggregate implements JsonType { try { return mapper.writeValueAsString(this); } catch (JsonProcessingException e) { - e.printStackTrace(); return null; } } diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java index 36665b0264..a48dfcd5dd 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java @@ -28,7 +28,6 @@ public class LogEvent implements JsonType { try { return mapper.writeValueAsString(this); } catch (JsonProcessingException e) { - e.printStackTrace(); return null; } } From 470d4efde85784f1c9e33d3c2947ca00d6607ca6 Mon Sep 17 00:00:00 2001 From: Adrian Maghear Date: Fri, 16 Oct 2020 13:07:22 +0200 Subject: [PATCH 3/3] [BAEL-3600] add integration test --- netflix-modules/mantis/pom.xml | 14 ++++ .../netflix/mantis/job/LogAggregationJob.java | 9 ++- .../netflix/mantis/job/LogCollectingJob.java | 11 ++- .../netflix/mantis/model/LogAggregate.java | 6 +- .../job/LogAggregationJobIntegrationTest.java | 56 ++++++++++++++ .../job/LogCollectingJobIntegrationTest.java | 73 +++++++++++++++++++ .../netflix/mantis/job/MantisJobTestBase.java | 49 +++++++++++++ 7 files changed, 212 insertions(+), 6 deletions(-) create mode 100644 netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java create mode 100644 netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java create mode 100644 netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java diff --git a/netflix-modules/mantis/pom.xml b/netflix-modules/mantis/pom.xml index 48151c142f..5d9611ccdf 100644 --- a/netflix-modules/mantis/pom.xml +++ b/netflix-modules/mantis/pom.xml @@ -52,6 +52,20 @@ 1.18.12 + + org.springframework + spring-webflux + 5.0.9.RELEASE + test + + + + io.projectreactor.netty + reactor-netty + 0.9.12.RELEASE + test + + diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java index 229d11d39d..7fc514deef 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java @@ -9,10 +9,17 @@ import io.mantisrx.runtime.Job; import io.mantisrx.runtime.MantisJob; import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.sink.Sink; import io.mantisrx.runtime.sink.Sinks; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +@NoArgsConstructor +@AllArgsConstructor public class LogAggregationJob extends MantisJobProvider { + private Sink sink = Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)); + @Override public Job getJobInstance() { @@ -21,7 +28,7 @@ public class LogAggregationJob extends MantisJobProvider { .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) - .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) + .sink(sink) .metadata(new Metadata.Builder().build()) .create(); diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java index 492f30c43a..34ccf8355a 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java @@ -9,18 +9,23 @@ import io.mantisrx.runtime.MantisJob; import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.Metadata; import io.mantisrx.runtime.ScalarToScalar; -import lombok.extern.slf4j.Slf4j; +import io.mantisrx.runtime.sink.Sink; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; -@Slf4j +@NoArgsConstructor +@AllArgsConstructor public class LogCollectingJob extends MantisJobProvider { + private Sink sink = new LogSink(); + @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) - .sink(new LogSink()) + .sink(sink) .metadata(new Metadata.Builder().build()) .create(); diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java index 0eeb7ea086..e0e3c4f9fa 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java @@ -5,15 +5,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.mantisrx.runtime.codec.JsonType; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; @Getter +@NoArgsConstructor @AllArgsConstructor public class LogAggregate implements JsonType { private static final ObjectMapper mapper = new ObjectMapper(); - private final Integer count; - private final String level; + private Integer count; + private String level; public String toJsonString() { try { diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java new file mode 100644 index 0000000000..b9b16e2146 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java @@ -0,0 +1,56 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import io.mantisrx.runtime.PortRequest; +import io.mantisrx.runtime.sink.Sinks; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static java.util.Arrays.asList; +import static java.util.Optional.of; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogAggregationJobIntegrationTest extends MantisJobTestBase { + + private final static int PORT = 7382; + private final static String SINK_URL = "http://localhost:" + PORT; + + @BeforeAll + static void beforeAll() { + start(new LogAggregationJob((context, portRequest, logAggregateObservable) -> { + logAggregateObservable.subscribe(); + Sinks.sse(LogAggregate::toJsonString).call(context, new PortRequest(PORT), logAggregateObservable); + })); + } + + @Override + public String getSinkUrl() { + return SINK_URL; + } + + @Override + public Class getEventType() { + return LogAggregate.class; + } + + @Test + void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogAggregates() { + assertEquals(of(5L), sinkStream.take(5).count().blockOptional()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveLogAggregate() { + assertNotNull(sinkStream.take(1).blockFirst()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveValidLogAggregate() { + LogAggregate logAggregate = sinkStream.take(1).blockFirst(); + + assertTrue(asList("ERROR", "WARN", "INFO").contains(logAggregate.getLevel())); + assertTrue(logAggregate.getCount() > 0); + } + +} \ No newline at end of file diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java new file mode 100644 index 0000000000..87e0c194b5 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java @@ -0,0 +1,73 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogEvent; +import com.baeldung.netflix.mantis.sink.LogSink; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.PortRequest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import rx.Observable; + +import static java.util.Arrays.asList; +import static java.util.Optional.of; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogCollectingJobIntegrationTest extends MantisJobTestBase { + + private final static int PORT = 7381; + private final static String SINK_URL = "http://localhost:" + PORT; + + @BeforeAll + static void beforeAll() { + + start(new LogCollectingJob(new LogSink() { + + @Override + public void call(Context context, PortRequest portRequest, Observable observable) { + super.call(context, new PortRequest(PORT), observable); + } + + })); + + } + + @Override + public String getSinkUrl() { + return SINK_URL; + } + + @Override + public Class getEventType() { + return LogEvent.class; + } + + @Test + void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogEvents() { + assertEquals(of(5L), sinkStream.take(5).count().blockOptional()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveLogEvent() { + assertNotNull(sinkStream.take(1).blockFirst()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveValidLogEvent() { + LogEvent logEvent = sinkStream.take(1).blockFirst(); + + assertTrue(asList("ERROR", "WARN", "INFO").contains(logEvent.getLevel())); + assertTrue(asList("login attempt", "user created").contains(logEvent.getMessage())); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveFilteredLogEvents() { + getSinkStream(SINK_URL + "?filter=login") + .take(7) + .toStream().forEach( + logEvent -> assertEquals("login attempt", logEvent.getMessage()) + ); + } + +} \ No newline at end of file diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java new file mode 100644 index 0000000000..89425299a4 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java @@ -0,0 +1,49 @@ +package com.baeldung.netflix.mantis.job; + +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.util.retry.Retry; + +import java.time.Duration; + +public abstract class MantisJobTestBase { + + private static Job jobInstance; + Flux sinkStream; + + public abstract String getSinkUrl(); + public abstract Class getEventType(); + + @BeforeEach + void setUp() { + sinkStream = getSinkStream(getSinkUrl()); + } + + @AfterAll + static void afterAll() { + stopJob(); + } + + protected Flux getSinkStream(String sinkUrl) { + return WebClient.builder().build().get() + .uri(sinkUrl) + .retrieve() + .bodyToFlux(getEventType()) + .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(2000))); + } + + static void start(MantisJobProvider job) { + jobInstance = job.getJobInstance(); + new Thread(() -> LocalJobExecutorNetworked.execute(jobInstance)).start(); + } + + static void stopJob() { + jobInstance.getLifecycle().shutdown(); + } + +}