diff --git a/netflix-modules/mantis/pom.xml b/netflix-modules/mantis/pom.xml new file mode 100644 index 0000000000..48151c142f --- /dev/null +++ b/netflix-modules/mantis/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + mantis + Mantis + jar + Sample project for Netflix Mantis + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../../parent-boot-2 + + + + + + org.springframework.boot + spring-boot-starter + 2.1.3.RELEASE + + + + io.mantisrx + mantis-runtime + 1.2.63 + + + org.slf4j + slf4j-log4j12 + + + + + + com.fasterxml.jackson.core + jackson-databind + 2.10.2 + + + + net.andreinc.mockneat + mockneat + 0.3.8 + + + + org.projectlombok + lombok + 1.18.12 + + + + + + + SpringLibReleaseRepo + https://repo.spring.io/libs-release/ + + + + diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java new file mode 100644 index 0000000000..ad2b7f9aed --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java @@ -0,0 +1,24 @@ +package com.baeldung.netflix.mantis; + +import com.baeldung.netflix.mantis.job.LogAggregationJob; +import com.baeldung.netflix.mantis.job.LogCollectingJob; +import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Slf4j +@SpringBootApplication +public class MantisApplication implements CommandLineRunner { + + public static void main(String[] args) { + SpringApplication.run(MantisApplication.class, args); + } + + @Override + public void run(String... args) { + LocalJobExecutorNetworked.execute(new LogAggregationJob().getJobInstance()); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java new file mode 100644 index 0000000000..229d11d39d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java @@ -0,0 +1,30 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import com.baeldung.netflix.mantis.source.RandomLogSource; +import com.baeldung.netflix.mantis.stage.CountLogStage; +import com.baeldung.netflix.mantis.stage.GroupLogStage; +import com.baeldung.netflix.mantis.stage.TransformLogStage; +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJob; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.sink.Sinks; + +public class LogAggregationJob extends MantisJobProvider { + + @Override + public Job getJobInstance() { + + return MantisJob + .source(new RandomLogSource()) + .stage(new TransformLogStage(), TransformLogStage.stageConfig()) + .stage(new GroupLogStage(), GroupLogStage.config()) + .stage(new CountLogStage(), CountLogStage.config()) + .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) + .metadata(new Metadata.Builder().build()) + .create(); + + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java new file mode 100644 index 0000000000..9bb6ae7f6d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java @@ -0,0 +1,31 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogEvent; +import com.baeldung.netflix.mantis.sink.LogSink; +import com.baeldung.netflix.mantis.source.RandomLogSource; +import com.baeldung.netflix.mantis.stage.TransformLogStage; +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJob; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.ScalarToScalar; +import io.mantisrx.runtime.sink.Sinks; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LogCollectingJob extends MantisJobProvider { + + @Override + public Job getJobInstance() { + + return MantisJob + .source(new RandomLogSource()) + .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) +// .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) + .sink(new LogSink()) + .metadata(new Metadata.Builder().build()) + .create(); + + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java new file mode 100644 index 0000000000..a9f1818b9a --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java @@ -0,0 +1,27 @@ +package com.baeldung.netflix.mantis.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mantisrx.runtime.codec.JsonType; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class LogAggregate implements JsonType { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private final Integer count; + private final String level; + + public String toJsonString() { + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java new file mode 100644 index 0000000000..36665b0264 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java @@ -0,0 +1,36 @@ +package com.baeldung.netflix.mantis.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mantisrx.runtime.codec.JsonType; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class LogEvent implements JsonType { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private Long index; + private String level; + private String message; + + public LogEvent(String[] parts) { + this.index = Long.valueOf(parts[0]); + this.level = parts[1]; + this.message = parts[2]; + } + + public String toJsonString() { + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java new file mode 100644 index 0000000000..ae2177bf87 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java @@ -0,0 +1,37 @@ +package com.baeldung.netflix.mantis.sink; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.PortRequest; +import io.mantisrx.runtime.sink.SelfDocumentingSink; +import io.mantisrx.runtime.sink.ServerSentEventsSink; +import io.mantisrx.runtime.sink.Sink; +import io.mantisrx.runtime.sink.predicate.Predicate; +import rx.Observable; + +public class LogSink implements Sink { + + @Override + public void call(Context context, PortRequest portRequest, Observable logEventObservable) { + + SelfDocumentingSink sink = new ServerSentEventsSink.Builder() + .withEncoder(LogEvent::toJsonString) + .withPredicate(filterByLogMessage()) + .build(); + + logEventObservable.subscribe(); + + sink.call(context, portRequest, logEventObservable); + } + + private Predicate filterByLogMessage() { + return new Predicate<>("filter by message", + parameters -> { + if (parameters != null && parameters.containsKey("filter")) { + return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); + } + return logEvent -> true; + }); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java new file mode 100644 index 0000000000..fe607d9866 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java @@ -0,0 +1,46 @@ +package com.baeldung.netflix.mantis.source; + +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.source.Index; +import io.mantisrx.runtime.source.Source; +import lombok.extern.slf4j.Slf4j; +import net.andreinc.mockneat.MockNeat; +import rx.Observable; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class RandomLogSource implements Source { + + private MockNeat mockDataGenerator; + + @Override + public void init(Context context, Index index) { + mockDataGenerator = MockNeat.threadLocal(); + } + + @Override + public Observable> call(Context context, Index index) { + return Observable.just( + Observable + .interval(250, TimeUnit.MILLISECONDS) + .map(this::createRandomLogEvent)); + } + + private String createRandomLogEvent(Long tick) { + String level = mockDataGenerator.probabilites(String.class) + .add(0.5, "INFO") + .add(0.3, "WARN") + .add(0.2, "ERROR") + .get(); + + String message = mockDataGenerator.probabilites(String.class) + .add(0.5, "login attempt") + .add(0.5, "user created") + .get(); + + return tick + "#" + level + "#" + message; + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java new file mode 100644 index 0000000000..5f02d783d0 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java @@ -0,0 +1,58 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.common.MantisGroup; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.GroupToScalar; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.GroupToScalarComputation; +import io.mantisrx.runtime.parameter.ParameterDefinition; +import io.mantisrx.runtime.parameter.type.IntParameter; +import io.mantisrx.runtime.parameter.validator.Validators; +import rx.Observable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class CountLogStage implements GroupToScalarComputation { + + private int duration; + + @Override + public void init(Context context) { + duration = (int)context.getParameters().get("LogAggregationDuration", 1000); + } + + @Override + public Observable call(Context context, Observable> mantisGroup) { + return mantisGroup + .window(duration, TimeUnit.MILLISECONDS) + .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) + .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) + .map((count) -> new LogAggregate(count, group.getKey())) + )); + } + + public static GroupToScalar.Config config(){ + return new GroupToScalar.Config() + .description("sum events for a log level") + .codec(JacksonCodecs.pojo(LogAggregate.class)) + .withParameters(getParameters()); + } + + public static List> getParameters() { + List> params = new ArrayList<>(); + + params.add(new IntParameter() + .name("LogAggregationDuration") + .description("window size for aggregation in milliseconds") + .validator(Validators.range(100, 10000)) + .defaultValue(5000) + .build()) ; + + return params; + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java new file mode 100644 index 0000000000..f21c4aba7d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java @@ -0,0 +1,25 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.common.MantisGroup; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.ScalarToGroup; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.ToGroupComputation; +import rx.Observable; + +public class GroupLogStage implements ToGroupComputation { + + @Override + public Observable> call(Context context, Observable logEvent) { + return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log)); + } + + public static ScalarToGroup.Config config(){ + return new ScalarToGroup.Config() + .description("Group event data by level") + .codec(JacksonCodecs.pojo(LogEvent.class)) + .concurrentInput(); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java new file mode 100644 index 0000000000..33e6567d13 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java @@ -0,0 +1,24 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.ScalarToScalar; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.ScalarComputation; +import rx.Observable; + +public class TransformLogStage implements ScalarComputation { + + @Override + public Observable call(Context context, Observable logEntry) { + return logEntry + .map(log -> log.split("#")) + .filter(parts -> parts.length == 3) + .map(LogEvent::new); + } + + public static ScalarToScalar.Config stageConfig() { + return new ScalarToScalar.Config() + .codec(JacksonCodecs.pojo(LogEvent.class)); + } +} diff --git a/netflix-modules/pom.xml b/netflix-modules/pom.xml index 9ed22498d8..538126fb34 100644 --- a/netflix-modules/pom.xml +++ b/netflix-modules/pom.xml @@ -15,6 +15,7 @@ genie + mantis \ No newline at end of file