From 470d4efde85784f1c9e33d3c2947ca00d6607ca6 Mon Sep 17 00:00:00 2001 From: Adrian Maghear Date: Fri, 16 Oct 2020 13:07:22 +0200 Subject: [PATCH] [BAEL-3600] add integration test --- netflix-modules/mantis/pom.xml | 14 ++++ .../netflix/mantis/job/LogAggregationJob.java | 9 ++- .../netflix/mantis/job/LogCollectingJob.java | 11 ++- .../netflix/mantis/model/LogAggregate.java | 6 +- .../job/LogAggregationJobIntegrationTest.java | 56 ++++++++++++++ .../job/LogCollectingJobIntegrationTest.java | 73 +++++++++++++++++++ .../netflix/mantis/job/MantisJobTestBase.java | 49 +++++++++++++ 7 files changed, 212 insertions(+), 6 deletions(-) create mode 100644 netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java create mode 100644 netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java create mode 100644 netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java diff --git a/netflix-modules/mantis/pom.xml b/netflix-modules/mantis/pom.xml index 48151c142f..5d9611ccdf 100644 --- a/netflix-modules/mantis/pom.xml +++ b/netflix-modules/mantis/pom.xml @@ -52,6 +52,20 @@ 1.18.12 + + org.springframework + spring-webflux + 5.0.9.RELEASE + test + + + + io.projectreactor.netty + reactor-netty + 0.9.12.RELEASE + test + + diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java index 229d11d39d..7fc514deef 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java @@ -9,10 +9,17 @@ import io.mantisrx.runtime.Job; import io.mantisrx.runtime.MantisJob; import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.sink.Sink; import io.mantisrx.runtime.sink.Sinks; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +@NoArgsConstructor +@AllArgsConstructor public class LogAggregationJob extends MantisJobProvider { + private Sink sink = Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)); + @Override public Job getJobInstance() { @@ -21,7 +28,7 @@ public class LogAggregationJob extends MantisJobProvider { .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) - .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) + .sink(sink) .metadata(new Metadata.Builder().build()) .create(); diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java index 492f30c43a..34ccf8355a 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java @@ -9,18 +9,23 @@ import io.mantisrx.runtime.MantisJob; import io.mantisrx.runtime.MantisJobProvider; import io.mantisrx.runtime.Metadata; import io.mantisrx.runtime.ScalarToScalar; -import lombok.extern.slf4j.Slf4j; +import io.mantisrx.runtime.sink.Sink; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; -@Slf4j +@NoArgsConstructor +@AllArgsConstructor public class LogCollectingJob extends MantisJobProvider { + private Sink sink = new LogSink(); + @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) - .sink(new LogSink()) + .sink(sink) .metadata(new Metadata.Builder().build()) .create(); diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java index 0eeb7ea086..e0e3c4f9fa 100644 --- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java @@ -5,15 +5,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.mantisrx.runtime.codec.JsonType; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; @Getter +@NoArgsConstructor @AllArgsConstructor public class LogAggregate implements JsonType { private static final ObjectMapper mapper = new ObjectMapper(); - private final Integer count; - private final String level; + private Integer count; + private String level; public String toJsonString() { try { diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java new file mode 100644 index 0000000000..b9b16e2146 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java @@ -0,0 +1,56 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import io.mantisrx.runtime.PortRequest; +import io.mantisrx.runtime.sink.Sinks; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static java.util.Arrays.asList; +import static java.util.Optional.of; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogAggregationJobIntegrationTest extends MantisJobTestBase { + + private final static int PORT = 7382; + private final static String SINK_URL = "http://localhost:" + PORT; + + @BeforeAll + static void beforeAll() { + start(new LogAggregationJob((context, portRequest, logAggregateObservable) -> { + logAggregateObservable.subscribe(); + Sinks.sse(LogAggregate::toJsonString).call(context, new PortRequest(PORT), logAggregateObservable); + })); + } + + @Override + public String getSinkUrl() { + return SINK_URL; + } + + @Override + public Class getEventType() { + return LogAggregate.class; + } + + @Test + void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogAggregates() { + assertEquals(of(5L), sinkStream.take(5).count().blockOptional()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveLogAggregate() { + assertNotNull(sinkStream.take(1).blockFirst()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveValidLogAggregate() { + LogAggregate logAggregate = sinkStream.take(1).blockFirst(); + + assertTrue(asList("ERROR", "WARN", "INFO").contains(logAggregate.getLevel())); + assertTrue(logAggregate.getCount() > 0); + } + +} \ No newline at end of file diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java new file mode 100644 index 0000000000..87e0c194b5 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java @@ -0,0 +1,73 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogEvent; +import com.baeldung.netflix.mantis.sink.LogSink; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.PortRequest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import rx.Observable; + +import static java.util.Arrays.asList; +import static java.util.Optional.of; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogCollectingJobIntegrationTest extends MantisJobTestBase { + + private final static int PORT = 7381; + private final static String SINK_URL = "http://localhost:" + PORT; + + @BeforeAll + static void beforeAll() { + + start(new LogCollectingJob(new LogSink() { + + @Override + public void call(Context context, PortRequest portRequest, Observable observable) { + super.call(context, new PortRequest(PORT), observable); + } + + })); + + } + + @Override + public String getSinkUrl() { + return SINK_URL; + } + + @Override + public Class getEventType() { + return LogEvent.class; + } + + @Test + void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogEvents() { + assertEquals(of(5L), sinkStream.take(5).count().blockOptional()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveLogEvent() { + assertNotNull(sinkStream.take(1).blockFirst()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveValidLogEvent() { + LogEvent logEvent = sinkStream.take(1).blockFirst(); + + assertTrue(asList("ERROR", "WARN", "INFO").contains(logEvent.getLevel())); + assertTrue(asList("login attempt", "user created").contains(logEvent.getMessage())); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveFilteredLogEvents() { + getSinkStream(SINK_URL + "?filter=login") + .take(7) + .toStream().forEach( + logEvent -> assertEquals("login attempt", logEvent.getMessage()) + ); + } + +} \ No newline at end of file diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java new file mode 100644 index 0000000000..89425299a4 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java @@ -0,0 +1,49 @@ +package com.baeldung.netflix.mantis.job; + +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.util.retry.Retry; + +import java.time.Duration; + +public abstract class MantisJobTestBase { + + private static Job jobInstance; + Flux sinkStream; + + public abstract String getSinkUrl(); + public abstract Class getEventType(); + + @BeforeEach + void setUp() { + sinkStream = getSinkStream(getSinkUrl()); + } + + @AfterAll + static void afterAll() { + stopJob(); + } + + protected Flux getSinkStream(String sinkUrl) { + return WebClient.builder().build().get() + .uri(sinkUrl) + .retrieve() + .bodyToFlux(getEventType()) + .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(2000))); + } + + static void start(MantisJobProvider job) { + jobInstance = job.getJobInstance(); + new Thread(() -> LocalJobExecutorNetworked.execute(jobInstance)).start(); + } + + static void stopJob() { + jobInstance.getLifecycle().shutdown(); + } + +}