Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
e521146f8f
|
@ -0,0 +1,6 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
public interface Animal {
|
||||
|
||||
String getName();
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
public class Box<T> {
|
||||
|
||||
private T content;
|
||||
|
||||
public T getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public void setContent(T content) {
|
||||
this.content = content;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
public class Frog extends Reptile {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return super.getName() + ": Frog";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
public class Mammal implements Animal {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "Mammal";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
public class Reptile implements Animal {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "Reptile";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class CheckedCastsUnitTest {
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenBaseTypeVariableReferencingChildInstance_whenCastToIncompatibleSubtype_thenClassCastException() {
|
||||
Animal animal = new Frog();
|
||||
|
||||
//A checked downcast to Mammal is incompatible from Frog because Frog is not a subtype of Mammal.
|
||||
Mammal mammal = (Mammal) animal;
|
||||
}
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenBaseTypeVariableReferencingChildInstance_whenCastToIncompatibleInterface_thenClassCastException() {
|
||||
Animal animal = new Frog();
|
||||
|
||||
//A checked cast to Serializable is incompatible from Frog because Frog is not a subtype of Serializable.
|
||||
Serializable serial = (Serializable) animal;
|
||||
}
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenObjectVariableReferencingPrimitiveArray_whenCastToBoxedTypeArray_thenClassCastException() {
|
||||
Object primitives = new int[1];
|
||||
|
||||
//A checked cast to Integer[] is incompatible from primitive arrays. Auto-boxing does not work for arrays.
|
||||
Integer[] integers = (Integer[]) primitives;
|
||||
}
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenObjectVariableReferencingPrimitiveArray_whenCastToPromotedTypeArray_thenClassCastException() {
|
||||
Object primitives = new int[1];
|
||||
|
||||
//A checked cast to long[] is incompatible from int[]. Type promotion does not work for arrays.
|
||||
long[] longs = (long[]) primitives;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class GenericConversionUnitTest {
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenIncompatibleType_whenConvertInstanceOfObject_thenClassCastException() {
|
||||
// Should have been null, but due to type erasure, inside convertInstanceOfObject,
|
||||
// it will attempt to cast to Object instead of String, so it casts to Object, which is always possible.
|
||||
String shouldBeNull = convertInstanceOfObject(123);
|
||||
}
|
||||
|
||||
public static <T> T convertInstanceOfObject(Object o) {
|
||||
try {
|
||||
return (T) o; // Casts to Object due to type erasure
|
||||
} catch (ClassCastException e) {
|
||||
return null; // Will never reach this
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package com.baeldung.exceptions.classcastexception;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class UncheckedConversionUnitTest {
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenPollutedGenericType_whenGetProperty_thenClassCastException() {
|
||||
Box<Long> originalBox = new Box<>();
|
||||
Box raw = originalBox;
|
||||
raw.setContent(2.5);
|
||||
Box<Long> bound = (Box<Long>) raw;
|
||||
|
||||
//An incompatible element was found in the raw box.
|
||||
Long content = bound.getContent();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>mantis</artifactId>
|
||||
<name>Mantis</name>
|
||||
<packaging>jar</packaging>
|
||||
<description>Sample project for Netflix Mantis</description>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-boot-2</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../../parent-boot-2</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.mantisrx</groupId>
|
||||
<artifactId>mantis-runtime</artifactId>
|
||||
<version>1.2.63</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>2.10.2</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.andreinc.mockneat</groupId>
|
||||
<artifactId>mockneat</artifactId>
|
||||
<version>0.3.8</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.12</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-webflux</artifactId>
|
||||
<version>5.0.9.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.projectreactor.netty</groupId>
|
||||
<artifactId>reactor-netty</artifactId>
|
||||
<version>0.9.12.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>SpringLibReleaseRepo</id>
|
||||
<url>https://repo.spring.io/libs-release/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,23 @@
|
|||
package com.baeldung.netflix.mantis;
|
||||
|
||||
import com.baeldung.netflix.mantis.job.LogAggregationJob;
|
||||
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@Slf4j
|
||||
@SpringBootApplication
|
||||
public class MantisApplication implements CommandLineRunner {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(MantisApplication.class, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) {
|
||||
LocalJobExecutorNetworked.execute(new LogAggregationJob().getJobInstance());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.baeldung.netflix.mantis.job;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogAggregate;
|
||||
import com.baeldung.netflix.mantis.source.RandomLogSource;
|
||||
import com.baeldung.netflix.mantis.stage.CountLogStage;
|
||||
import com.baeldung.netflix.mantis.stage.GroupLogStage;
|
||||
import com.baeldung.netflix.mantis.stage.TransformLogStage;
|
||||
import io.mantisrx.runtime.Job;
|
||||
import io.mantisrx.runtime.MantisJob;
|
||||
import io.mantisrx.runtime.MantisJobProvider;
|
||||
import io.mantisrx.runtime.Metadata;
|
||||
import io.mantisrx.runtime.sink.Sink;
|
||||
import io.mantisrx.runtime.sink.Sinks;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
|
||||
|
||||
private Sink<LogAggregate> sink = Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString));
|
||||
|
||||
@Override
|
||||
public Job<LogAggregate> getJobInstance() {
|
||||
|
||||
return MantisJob
|
||||
.source(new RandomLogSource())
|
||||
.stage(new TransformLogStage(), TransformLogStage.stageConfig())
|
||||
.stage(new GroupLogStage(), GroupLogStage.config())
|
||||
.stage(new CountLogStage(), CountLogStage.config())
|
||||
.sink(sink)
|
||||
.metadata(new Metadata.Builder().build())
|
||||
.create();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package com.baeldung.netflix.mantis.job;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogEvent;
|
||||
import com.baeldung.netflix.mantis.sink.LogSink;
|
||||
import com.baeldung.netflix.mantis.source.RandomLogSource;
|
||||
import com.baeldung.netflix.mantis.stage.TransformLogStage;
|
||||
import io.mantisrx.runtime.Job;
|
||||
import io.mantisrx.runtime.MantisJob;
|
||||
import io.mantisrx.runtime.MantisJobProvider;
|
||||
import io.mantisrx.runtime.Metadata;
|
||||
import io.mantisrx.runtime.ScalarToScalar;
|
||||
import io.mantisrx.runtime.sink.Sink;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogCollectingJob extends MantisJobProvider<LogEvent> {
|
||||
|
||||
private Sink<LogEvent> sink = new LogSink();
|
||||
|
||||
@Override
|
||||
public Job<LogEvent> getJobInstance() {
|
||||
|
||||
return MantisJob
|
||||
.source(new RandomLogSource())
|
||||
.stage(new TransformLogStage(), new ScalarToScalar.Config<>())
|
||||
.sink(sink)
|
||||
.metadata(new Metadata.Builder().build())
|
||||
.create();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package com.baeldung.netflix.mantis.model;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.mantisrx.runtime.codec.JsonType;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogAggregate implements JsonType {
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private Integer count;
|
||||
private String level;
|
||||
|
||||
public String toJsonString() {
|
||||
try {
|
||||
return mapper.writeValueAsString(this);
|
||||
} catch (JsonProcessingException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package com.baeldung.netflix.mantis.model;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.mantisrx.runtime.codec.JsonType;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
public class LogEvent implements JsonType {
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private Long index;
|
||||
private String level;
|
||||
private String message;
|
||||
|
||||
public LogEvent(String[] parts) {
|
||||
this.index = Long.valueOf(parts[0]);
|
||||
this.level = parts[1];
|
||||
this.message = parts[2];
|
||||
}
|
||||
|
||||
public String toJsonString() {
|
||||
try {
|
||||
return mapper.writeValueAsString(this);
|
||||
} catch (JsonProcessingException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.baeldung.netflix.mantis.sink;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogEvent;
|
||||
import io.mantisrx.runtime.Context;
|
||||
import io.mantisrx.runtime.PortRequest;
|
||||
import io.mantisrx.runtime.sink.SelfDocumentingSink;
|
||||
import io.mantisrx.runtime.sink.ServerSentEventsSink;
|
||||
import io.mantisrx.runtime.sink.Sink;
|
||||
import io.mantisrx.runtime.sink.predicate.Predicate;
|
||||
import rx.Observable;
|
||||
|
||||
public class LogSink implements Sink<LogEvent> {
|
||||
|
||||
@Override
|
||||
public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
|
||||
|
||||
SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
|
||||
.withEncoder(LogEvent::toJsonString)
|
||||
.withPredicate(filterByLogMessage())
|
||||
.build();
|
||||
|
||||
logEventObservable.subscribe();
|
||||
|
||||
sink.call(context, portRequest, logEventObservable);
|
||||
}
|
||||
|
||||
private Predicate<LogEvent> filterByLogMessage() {
|
||||
return new Predicate<>("filter by message",
|
||||
parameters -> {
|
||||
if (parameters != null && parameters.containsKey("filter")) {
|
||||
return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
|
||||
}
|
||||
return logEvent -> true;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package com.baeldung.netflix.mantis.source;
|
||||
|
||||
import io.mantisrx.runtime.Context;
|
||||
import io.mantisrx.runtime.source.Index;
|
||||
import io.mantisrx.runtime.source.Source;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.andreinc.mockneat.MockNeat;
|
||||
import rx.Observable;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
public class RandomLogSource implements Source<String> {
|
||||
|
||||
private MockNeat mockDataGenerator;
|
||||
|
||||
@Override
|
||||
public void init(Context context, Index index) {
|
||||
mockDataGenerator = MockNeat.threadLocal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Observable<Observable<String>> call(Context context, Index index) {
|
||||
return Observable.just(
|
||||
Observable
|
||||
.interval(250, TimeUnit.MILLISECONDS)
|
||||
.map(this::createRandomLogEvent));
|
||||
}
|
||||
|
||||
private String createRandomLogEvent(Long tick) {
|
||||
String level = mockDataGenerator.probabilites(String.class)
|
||||
.add(0.5, "INFO")
|
||||
.add(0.3, "WARN")
|
||||
.add(0.2, "ERROR")
|
||||
.get();
|
||||
|
||||
String message = mockDataGenerator.probabilites(String.class)
|
||||
.add(0.5, "login attempt")
|
||||
.add(0.5, "user created")
|
||||
.get();
|
||||
|
||||
return tick + "#" + level + "#" + message;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
package com.baeldung.netflix.mantis.stage;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogAggregate;
|
||||
import com.baeldung.netflix.mantis.model.LogEvent;
|
||||
import io.mantisrx.common.MantisGroup;
|
||||
import io.mantisrx.runtime.Context;
|
||||
import io.mantisrx.runtime.GroupToScalar;
|
||||
import io.mantisrx.runtime.codec.JacksonCodecs;
|
||||
import io.mantisrx.runtime.computation.GroupToScalarComputation;
|
||||
import io.mantisrx.runtime.parameter.ParameterDefinition;
|
||||
import io.mantisrx.runtime.parameter.type.IntParameter;
|
||||
import io.mantisrx.runtime.parameter.validator.Validators;
|
||||
import rx.Observable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {
|
||||
|
||||
private int duration;
|
||||
|
||||
@Override
|
||||
public void init(Context context) {
|
||||
duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
|
||||
return mantisGroup
|
||||
.window(duration, TimeUnit.MILLISECONDS)
|
||||
.flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
|
||||
.flatMap(group -> group.reduce(0, (count, value) -> count = count + 1)
|
||||
.map((count) -> new LogAggregate(count, group.getKey()))
|
||||
));
|
||||
}
|
||||
|
||||
public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
|
||||
return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
|
||||
.description("sum events for a log level")
|
||||
.codec(JacksonCodecs.pojo(LogAggregate.class))
|
||||
.withParameters(getParameters());
|
||||
}
|
||||
|
||||
public static List<ParameterDefinition<?>> getParameters() {
|
||||
List<ParameterDefinition<?>> params = new ArrayList<>();
|
||||
|
||||
params.add(new IntParameter()
|
||||
.name("LogAggregationDuration")
|
||||
.description("window size for aggregation in milliseconds")
|
||||
.validator(Validators.range(100, 10000))
|
||||
.defaultValue(5000)
|
||||
.build()) ;
|
||||
|
||||
return params;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package com.baeldung.netflix.mantis.stage;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogEvent;
|
||||
import io.mantisrx.common.MantisGroup;
|
||||
import io.mantisrx.runtime.Context;
|
||||
import io.mantisrx.runtime.ScalarToGroup;
|
||||
import io.mantisrx.runtime.codec.JacksonCodecs;
|
||||
import io.mantisrx.runtime.computation.ToGroupComputation;
|
||||
import rx.Observable;
|
||||
|
||||
public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {
|
||||
|
||||
@Override
|
||||
public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
|
||||
return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
|
||||
}
|
||||
|
||||
public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
|
||||
return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
|
||||
.description("Group event data by level")
|
||||
.codec(JacksonCodecs.pojo(LogEvent.class))
|
||||
.concurrentInput();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package com.baeldung.netflix.mantis.stage;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogEvent;
|
||||
import io.mantisrx.runtime.Context;
|
||||
import io.mantisrx.runtime.ScalarToScalar;
|
||||
import io.mantisrx.runtime.codec.JacksonCodecs;
|
||||
import io.mantisrx.runtime.computation.ScalarComputation;
|
||||
import rx.Observable;
|
||||
|
||||
public class TransformLogStage implements ScalarComputation<String, LogEvent> {
|
||||
|
||||
@Override
|
||||
public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
|
||||
return logEntry
|
||||
.map(log -> log.split("#"))
|
||||
.filter(parts -> parts.length == 3)
|
||||
.map(LogEvent::new);
|
||||
}
|
||||
|
||||
public static ScalarToScalar.Config<String, LogEvent> stageConfig() {
|
||||
return new ScalarToScalar.Config<String, LogEvent>()
|
||||
.codec(JacksonCodecs.pojo(LogEvent.class));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
package com.baeldung.netflix.mantis.job;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogAggregate;
|
||||
import io.mantisrx.runtime.PortRequest;
|
||||
import io.mantisrx.runtime.sink.Sinks;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Optional.of;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class LogAggregationJobIntegrationTest extends MantisJobTestBase<LogAggregate> {
|
||||
|
||||
private final static int PORT = 7382;
|
||||
private final static String SINK_URL = "http://localhost:" + PORT;
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
start(new LogAggregationJob((context, portRequest, logAggregateObservable) -> {
|
||||
logAggregateObservable.subscribe();
|
||||
Sinks.sse(LogAggregate::toJsonString).call(context, new PortRequest(PORT), logAggregateObservable);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSinkUrl() {
|
||||
return SINK_URL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<LogAggregate> getEventType() {
|
||||
return LogAggregate.class;
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogAggregates() {
|
||||
assertEquals(of(5L), sinkStream.take(5).count().blockOptional());
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveLogAggregate() {
|
||||
assertNotNull(sinkStream.take(1).blockFirst());
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveValidLogAggregate() {
|
||||
LogAggregate logAggregate = sinkStream.take(1).blockFirst();
|
||||
|
||||
assertTrue(asList("ERROR", "WARN", "INFO").contains(logAggregate.getLevel()));
|
||||
assertTrue(logAggregate.getCount() > 0);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package com.baeldung.netflix.mantis.job;
|
||||
|
||||
import com.baeldung.netflix.mantis.model.LogEvent;
|
||||
import com.baeldung.netflix.mantis.sink.LogSink;
|
||||
import io.mantisrx.runtime.Context;
|
||||
import io.mantisrx.runtime.PortRequest;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import rx.Observable;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Optional.of;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class LogCollectingJobIntegrationTest extends MantisJobTestBase<LogEvent> {
|
||||
|
||||
private final static int PORT = 7381;
|
||||
private final static String SINK_URL = "http://localhost:" + PORT;
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
|
||||
start(new LogCollectingJob(new LogSink() {
|
||||
|
||||
@Override
|
||||
public void call(Context context, PortRequest portRequest, Observable<LogEvent> observable) {
|
||||
super.call(context, new PortRequest(PORT), observable);
|
||||
}
|
||||
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSinkUrl() {
|
||||
return SINK_URL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<LogEvent> getEventType() {
|
||||
return LogEvent.class;
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogEvents() {
|
||||
assertEquals(of(5L), sinkStream.take(5).count().blockOptional());
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveLogEvent() {
|
||||
assertNotNull(sinkStream.take(1).blockFirst());
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveValidLogEvent() {
|
||||
LogEvent logEvent = sinkStream.take(1).blockFirst();
|
||||
|
||||
assertTrue(asList("ERROR", "WARN", "INFO").contains(logEvent.getLevel()));
|
||||
assertTrue(asList("login attempt", "user created").contains(logEvent.getMessage()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenReadingFromSink_thenShouldRetrieveFilteredLogEvents() {
|
||||
getSinkStream(SINK_URL + "?filter=login")
|
||||
.take(7)
|
||||
.toStream().forEach(
|
||||
logEvent -> assertEquals("login attempt", logEvent.getMessage())
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.baeldung.netflix.mantis.job;
|
||||
|
||||
import io.mantisrx.runtime.Job;
|
||||
import io.mantisrx.runtime.MantisJobProvider;
|
||||
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public abstract class MantisJobTestBase<T> {
|
||||
|
||||
private static Job jobInstance;
|
||||
Flux<T> sinkStream;
|
||||
|
||||
public abstract String getSinkUrl();
|
||||
public abstract Class<T> getEventType();
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
sinkStream = getSinkStream(getSinkUrl());
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void afterAll() {
|
||||
stopJob();
|
||||
}
|
||||
|
||||
protected Flux<T> getSinkStream(String sinkUrl) {
|
||||
return WebClient.builder().build().get()
|
||||
.uri(sinkUrl)
|
||||
.retrieve()
|
||||
.bodyToFlux(getEventType())
|
||||
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(2000)));
|
||||
}
|
||||
|
||||
static <T> void start(MantisJobProvider<T> job) {
|
||||
jobInstance = job.getJobInstance();
|
||||
new Thread(() -> LocalJobExecutorNetworked.execute(jobInstance)).start();
|
||||
}
|
||||
|
||||
static void stopJob() {
|
||||
jobInstance.getLifecycle().shutdown();
|
||||
}
|
||||
|
||||
}
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
<modules>
|
||||
<module>genie</module>
|
||||
<module>mantis</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
Loading…
Reference in New Issue