[BAEL-3600] add integration test

This commit is contained in:
Adrian Maghear 2020-10-16 13:07:22 +02:00
parent 795bcbe2b7
commit 470d4efde8
7 changed files with 212 additions and 6 deletions

View File

@ -52,6 +52,20 @@
<version>1.18.12</version> <version>1.18.12</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.0.9.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.12.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<repositories> <repositories>

View File

@ -9,10 +9,17 @@ import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob; import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata; import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.sink.Sink;
import io.mantisrx.runtime.sink.Sinks; import io.mantisrx.runtime.sink.Sinks;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class LogAggregationJob extends MantisJobProvider<LogAggregate> { public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
private Sink<LogAggregate> sink = Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString));
@Override @Override
public Job<LogAggregate> getJobInstance() { public Job<LogAggregate> getJobInstance() {
@ -21,7 +28,7 @@ public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
.stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new TransformLogStage(), TransformLogStage.stageConfig())
.stage(new GroupLogStage(), GroupLogStage.config()) .stage(new GroupLogStage(), GroupLogStage.config())
.stage(new CountLogStage(), CountLogStage.config()) .stage(new CountLogStage(), CountLogStage.config())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) .sink(sink)
.metadata(new Metadata.Builder().build()) .metadata(new Metadata.Builder().build())
.create(); .create();

View File

@ -9,18 +9,23 @@ import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata; import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.ScalarToScalar; import io.mantisrx.runtime.ScalarToScalar;
import lombok.extern.slf4j.Slf4j; import io.mantisrx.runtime.sink.Sink;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@Slf4j @NoArgsConstructor
@AllArgsConstructor
public class LogCollectingJob extends MantisJobProvider<LogEvent> { public class LogCollectingJob extends MantisJobProvider<LogEvent> {
private Sink<LogEvent> sink = new LogSink();
@Override @Override
public Job<LogEvent> getJobInstance() { public Job<LogEvent> getJobInstance() {
return MantisJob return MantisJob
.source(new RandomLogSource()) .source(new RandomLogSource())
.stage(new TransformLogStage(), new ScalarToScalar.Config<>()) .stage(new TransformLogStage(), new ScalarToScalar.Config<>())
.sink(new LogSink()) .sink(sink)
.metadata(new Metadata.Builder().build()) .metadata(new Metadata.Builder().build())
.create(); .create();

View File

@ -5,15 +5,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.runtime.codec.JsonType; import io.mantisrx.runtime.codec.JsonType;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor;
@Getter @Getter
@NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class LogAggregate implements JsonType { public class LogAggregate implements JsonType {
private static final ObjectMapper mapper = new ObjectMapper(); private static final ObjectMapper mapper = new ObjectMapper();
private final Integer count; private Integer count;
private final String level; private String level;
public String toJsonString() { public String toJsonString() {
try { try {

View File

@ -0,0 +1,56 @@
package com.baeldung.netflix.mantis.job;
import com.baeldung.netflix.mantis.model.LogAggregate;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.sink.Sinks;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static java.util.Arrays.asList;
import static java.util.Optional.of;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class LogAggregationJobIntegrationTest extends MantisJobTestBase<LogAggregate> {
private final static int PORT = 7382;
private final static String SINK_URL = "http://localhost:" + PORT;
@BeforeAll
static void beforeAll() {
start(new LogAggregationJob((context, portRequest, logAggregateObservable) -> {
logAggregateObservable.subscribe();
Sinks.sse(LogAggregate::toJsonString).call(context, new PortRequest(PORT), logAggregateObservable);
}));
}
@Override
public String getSinkUrl() {
return SINK_URL;
}
@Override
public Class<LogAggregate> getEventType() {
return LogAggregate.class;
}
@Test
void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogAggregates() {
assertEquals(of(5L), sinkStream.take(5).count().blockOptional());
}
@Test
void whenReadingFromSink_thenShouldRetrieveLogAggregate() {
assertNotNull(sinkStream.take(1).blockFirst());
}
@Test
void whenReadingFromSink_thenShouldRetrieveValidLogAggregate() {
LogAggregate logAggregate = sinkStream.take(1).blockFirst();
assertTrue(asList("ERROR", "WARN", "INFO").contains(logAggregate.getLevel()));
assertTrue(logAggregate.getCount() > 0);
}
}

View File

@ -0,0 +1,73 @@
package com.baeldung.netflix.mantis.job;
import com.baeldung.netflix.mantis.model.LogEvent;
import com.baeldung.netflix.mantis.sink.LogSink;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.PortRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import rx.Observable;
import static java.util.Arrays.asList;
import static java.util.Optional.of;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class LogCollectingJobIntegrationTest extends MantisJobTestBase<LogEvent> {
private final static int PORT = 7381;
private final static String SINK_URL = "http://localhost:" + PORT;
@BeforeAll
static void beforeAll() {
start(new LogCollectingJob(new LogSink() {
@Override
public void call(Context context, PortRequest portRequest, Observable<LogEvent> observable) {
super.call(context, new PortRequest(PORT), observable);
}
}));
}
@Override
public String getSinkUrl() {
return SINK_URL;
}
@Override
public Class<LogEvent> getEventType() {
return LogEvent.class;
}
@Test
void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogEvents() {
assertEquals(of(5L), sinkStream.take(5).count().blockOptional());
}
@Test
void whenReadingFromSink_thenShouldRetrieveLogEvent() {
assertNotNull(sinkStream.take(1).blockFirst());
}
@Test
void whenReadingFromSink_thenShouldRetrieveValidLogEvent() {
LogEvent logEvent = sinkStream.take(1).blockFirst();
assertTrue(asList("ERROR", "WARN", "INFO").contains(logEvent.getLevel()));
assertTrue(asList("login attempt", "user created").contains(logEvent.getMessage()));
}
@Test
void whenReadingFromSink_thenShouldRetrieveFilteredLogEvents() {
getSinkStream(SINK_URL + "?filter=login")
.take(7)
.toStream().forEach(
logEvent -> assertEquals("login attempt", logEvent.getMessage())
);
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.netflix.mantis.job;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;
import java.time.Duration;
public abstract class MantisJobTestBase<T> {
private static Job jobInstance;
Flux<T> sinkStream;
public abstract String getSinkUrl();
public abstract Class<T> getEventType();
@BeforeEach
void setUp() {
sinkStream = getSinkStream(getSinkUrl());
}
@AfterAll
static void afterAll() {
stopJob();
}
protected Flux<T> getSinkStream(String sinkUrl) {
return WebClient.builder().build().get()
.uri(sinkUrl)
.retrieve()
.bodyToFlux(getEventType())
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(2000)));
}
static <T> void start(MantisJobProvider<T> job) {
jobInstance = job.getJobInstance();
new Thread(() -> LocalJobExecutorNetworked.execute(jobInstance)).start();
}
static void stopJob() {
jobInstance.getLifecycle().shutdown();
}
}