[BAEL-3600] setup initial mantis job

This commit is contained in:
Adrian Maghear 2020-10-03 21:28:40 +02:00
parent d100adc9c5
commit 969cedfecd
12 changed files with 403 additions and 0 deletions

View File

@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>mantis</artifactId>
<name>Mantis</name>
<packaging>jar</packaging>
<description>Sample project for Netflix Mantis</description>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../parent-boot-2</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>io.mantisrx</groupId>
<artifactId>mantis-runtime</artifactId>
<version>1.2.63</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>net.andreinc.mockneat</groupId>
<artifactId>mockneat</artifactId>
<version>0.3.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>SpringLibReleaseRepo</id>
<url>https://repo.spring.io/libs-release/</url>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,24 @@
package com.baeldung.netflix.mantis;
import com.baeldung.netflix.mantis.job.LogAggregationJob;
import com.baeldung.netflix.mantis.job.LogCollectingJob;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@Slf4j
@SpringBootApplication
public class MantisApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(MantisApplication.class, args);
}
@Override
public void run(String... args) {
LocalJobExecutorNetworked.execute(new LogAggregationJob().getJobInstance());
}
}

View File

@ -0,0 +1,30 @@
package com.baeldung.netflix.mantis.job;
import com.baeldung.netflix.mantis.model.LogAggregate;
import com.baeldung.netflix.mantis.source.RandomLogSource;
import com.baeldung.netflix.mantis.stage.CountLogStage;
import com.baeldung.netflix.mantis.stage.GroupLogStage;
import com.baeldung.netflix.mantis.stage.TransformLogStage;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.sink.Sinks;
public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
@Override
public Job<LogAggregate> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), TransformLogStage.stageConfig())
.stage(new GroupLogStage(), GroupLogStage.config())
.stage(new CountLogStage(), CountLogStage.config())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.netflix.mantis.job;
import com.baeldung.netflix.mantis.model.LogEvent;
import com.baeldung.netflix.mantis.sink.LogSink;
import com.baeldung.netflix.mantis.source.RandomLogSource;
import com.baeldung.netflix.mantis.stage.TransformLogStage;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.sink.Sinks;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LogCollectingJob extends MantisJobProvider<LogEvent> {
@Override
public Job<LogEvent> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), new ScalarToScalar.Config<>())
// .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
.sink(new LogSink())
.metadata(new Metadata.Builder().build())
.create();
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.netflix.mantis.model;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.runtime.codec.JsonType;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public class LogAggregate implements JsonType {
private static final ObjectMapper mapper = new ObjectMapper();
private final Integer count;
private final String level;
public String toJsonString() {
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,36 @@
package com.baeldung.netflix.mantis.model;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.runtime.codec.JsonType;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
public class LogEvent implements JsonType {
private static final ObjectMapper mapper = new ObjectMapper();
private Long index;
private String level;
private String message;
public LogEvent(String[] parts) {
this.index = Long.valueOf(parts[0]);
this.level = parts[1];
this.message = parts[2];
}
public String toJsonString() {
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,37 @@
package com.baeldung.netflix.mantis.sink;
import com.baeldung.netflix.mantis.model.LogEvent;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.ServerSentEventsSink;
import io.mantisrx.runtime.sink.Sink;
import io.mantisrx.runtime.sink.predicate.Predicate;
import rx.Observable;
public class LogSink implements Sink<LogEvent> {
@Override
public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
.withEncoder(LogEvent::toJsonString)
.withPredicate(filterByLogMessage())
.build();
logEventObservable.subscribe();
sink.call(context, portRequest, logEventObservable);
}
private Predicate<LogEvent> filterByLogMessage() {
return new Predicate<>("filter by message",
parameters -> {
if (parameters != null && parameters.containsKey("filter")) {
return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
}
return logEvent -> true;
});
}
}

View File

@ -0,0 +1,46 @@
package com.baeldung.netflix.mantis.source;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import lombok.extern.slf4j.Slf4j;
import net.andreinc.mockneat.MockNeat;
import rx.Observable;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RandomLogSource implements Source<String> {
private MockNeat mockDataGenerator;
@Override
public void init(Context context, Index index) {
mockDataGenerator = MockNeat.threadLocal();
}
@Override
public Observable<Observable<String>> call(Context context, Index index) {
return Observable.just(
Observable
.interval(250, TimeUnit.MILLISECONDS)
.map(this::createRandomLogEvent));
}
private String createRandomLogEvent(Long tick) {
String level = mockDataGenerator.probabilites(String.class)
.add(0.5, "INFO")
.add(0.3, "WARN")
.add(0.2, "ERROR")
.get();
String message = mockDataGenerator.probabilites(String.class)
.add(0.5, "login attempt")
.add(0.5, "user created")
.get();
return tick + "#" + level + "#" + message;
}
}

View File

@ -0,0 +1,58 @@
package com.baeldung.netflix.mantis.stage;
import com.baeldung.netflix.mantis.model.LogAggregate;
import com.baeldung.netflix.mantis.model.LogEvent;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.computation.GroupToScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import rx.Observable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {
private int duration;
@Override
public void init(Context context) {
duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
}
@Override
public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
return mantisGroup
.window(duration, TimeUnit.MILLISECONDS)
.flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
.flatMap(group -> group.reduce(0, (count, value) -> count = count + 1)
.map((count) -> new LogAggregate(count, group.getKey()))
));
}
public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
.description("sum events for a log level")
.codec(JacksonCodecs.pojo(LogAggregate.class))
.withParameters(getParameters());
}
public static List<ParameterDefinition<?>> getParameters() {
List<ParameterDefinition<?>> params = new ArrayList<>();
params.add(new IntParameter()
.name("LogAggregationDuration")
.description("window size for aggregation in milliseconds")
.validator(Validators.range(100, 10000))
.defaultValue(5000)
.build()) ;
return params;
}
}

View File

@ -0,0 +1,25 @@
package com.baeldung.netflix.mantis.stage;
import com.baeldung.netflix.mantis.model.LogEvent;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.computation.ToGroupComputation;
import rx.Observable;
public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {
@Override
public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
}
public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
.description("Group event data by level")
.codec(JacksonCodecs.pojo(LogEvent.class))
.concurrentInput();
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.netflix.mantis.stage;
import com.baeldung.netflix.mantis.model.LogEvent;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.computation.ScalarComputation;
import rx.Observable;
public class TransformLogStage implements ScalarComputation<String, LogEvent> {
@Override
public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
return logEntry
.map(log -> log.split("#"))
.filter(parts -> parts.length == 3)
.map(LogEvent::new);
}
public static ScalarToScalar.Config<String, LogEvent> stageConfig() {
return new ScalarToScalar.Config<String, LogEvent>()
.codec(JacksonCodecs.pojo(LogEvent.class));
}
}

View File

@ -15,6 +15,7 @@
<modules>
<module>genie</module>
<module>mantis</module>
</modules>
</project>