From d4dbc6a526ea171e895f7a8375746929dc1eb774 Mon Sep 17 00:00:00 2001 From: johnA1331 <53036378+johnA1331@users.noreply.github.com> Date: Wed, 13 Oct 2021 23:45:08 +0800 Subject: [PATCH 1/5] Create README.md --- maven-modules/maven-generate-war/README.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 maven-modules/maven-generate-war/README.md diff --git a/maven-modules/maven-generate-war/README.md b/maven-modules/maven-generate-war/README.md new file mode 100644 index 0000000000..1e74a087ae --- /dev/null +++ b/maven-modules/maven-generate-war/README.md @@ -0,0 +1,3 @@ +### Relevant Articles: + +- [Generate a WAR File in Maven](https://www.baeldung.com/maven-generate-war-file) From 68e0a1a0940d860bd8be711bd2a802af77f64d1e Mon Sep 17 00:00:00 2001 From: johnA1331 <53036378+johnA1331@users.noreply.github.com> Date: Wed, 13 Oct 2021 23:47:53 +0800 Subject: [PATCH 2/5] Update README.md --- quarkus-vs-springboot/README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/quarkus-vs-springboot/README.md b/quarkus-vs-springboot/README.md index 35fc8eb5eb..05eaabb923 100644 --- a/quarkus-vs-springboot/README.md +++ b/quarkus-vs-springboot/README.md @@ -92,4 +92,8 @@ Then, once again, you can also run both application and DB from docker, using: docker-compose -f src/main/docker/quarkus.yml up ``` -Now you have all you need to reproduce the tests with your machine. \ No newline at end of file +Now you have all you need to reproduce the tests with your machine. + +### Relevant Articles: + +- [Spring Boot vs Quarkus](https://www.baeldung.com/spring-boot-vs-quarkus) From d64b8f1b2cc5397a74a315f5c5815f2ecd6c1864 Mon Sep 17 00:00:00 2001 From: johnA1331 <53036378+johnA1331@users.noreply.github.com> Date: Wed, 13 Oct 2021 23:55:55 +0800 Subject: [PATCH 3/5] Update README.md --- core-java-modules/core-java-annotations/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/core-java-modules/core-java-annotations/README.md b/core-java-modules/core-java-annotations/README.md index 18f5589771..a8a9ac3b16 100644 --- a/core-java-modules/core-java-annotations/README.md +++ b/core-java-modules/core-java-annotations/README.md @@ -12,3 +12,4 @@ - [Efficient Word Frequency Calculator in Java](https://www.baeldung.com/java-word-frequency) - [Why Missing Annotations Don’t Cause ClassNotFoundException](https://www.baeldung.com/classnotfoundexception-missing-annotation) - [Valid @SuppressWarnings Warning Names](https://www.baeldung.com/java-suppresswarnings-valid-names) +- [Get a Field’s Annotations Using Reflection](https://www.baeldung.com/java-get-field-annotations) From 2dfdb515922a1f0be1be7af1fef7c03592d24b10 Mon Sep 17 00:00:00 2001 From: lucaCambi77 Date: Wed, 13 Oct 2021 18:21:35 +0200 Subject: [PATCH 4/5] feat: env variable prefixes spring boot app (#11323) --- .../baeldung/prefix/PrefixApplication.java | 14 ++++++++++++++ .../com/baeldung/prefix/PrefixController.java | 19 +++++++++++++++++++ .../src/main/resources/templates/prefix.html | 9 +++++++++ 3 files changed, 42 insertions(+) create mode 100644 spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixApplication.java create mode 100644 spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixController.java create mode 100644 spring-boot-modules/spring-boot-environment/src/main/resources/templates/prefix.html diff --git a/spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixApplication.java b/spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixApplication.java new file mode 100644 index 0000000000..29fe3d8930 --- /dev/null +++ b/spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixApplication.java @@ -0,0 +1,14 @@ +package com.baeldung.prefix; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class PrefixApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(PrefixApplication.class); + application.setEnvironmentPrefix("prefix"); + application.run(args); + } +} diff --git a/spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixController.java b/spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixController.java new file mode 100644 index 0000000000..00b728c7ae --- /dev/null +++ b/spring-boot-modules/spring-boot-environment/src/main/java/com/baeldung/prefix/PrefixController.java @@ -0,0 +1,19 @@ +package com.baeldung.prefix; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Controller; +import org.springframework.ui.Model; +import org.springframework.web.bind.annotation.GetMapping; + +@Controller +public class PrefixController { + + @Value(value = "${server.port}") + private int serverPort; + + @GetMapping("/prefix") + public String getServerPortInfo(final Model model) { + model.addAttribute("serverPort", serverPort); + return "prefix"; + } +} diff --git a/spring-boot-modules/spring-boot-environment/src/main/resources/templates/prefix.html b/spring-boot-modules/spring-boot-environment/src/main/resources/templates/prefix.html new file mode 100644 index 0000000000..7bb5a76537 --- /dev/null +++ b/spring-boot-modules/spring-boot-environment/src/main/resources/templates/prefix.html @@ -0,0 +1,9 @@ + + + + Prefix Example Page + + +It is working as we expected. Your server is running at port : + + \ No newline at end of file From 47abbf654b5bee31721b9d40d0115471b114142d Mon Sep 17 00:00:00 2001 From: psevestre Date: Thu, 14 Oct 2021 01:04:19 -0300 Subject: [PATCH 5/5] BAEL-772 Reactive Streams API with Ratpack (#11328) * [BAEL-4849] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Remove extra comments * [BAEL-4020] Article code * [BAEL-722] Article code --- .../main/java/com/baeldung/model/Quote.java | 107 +++++++++ .../rxjava/service/QuotesService.java | 44 ++++ .../spring/EmbedRatpackStreamsApp.java | 206 ++++++++++++++++++ .../baeldung/ratpack/CompliantPublisher.java | 63 ++++++ .../baeldung/ratpack/LoggingSubscriber.java | 67 ++++++ .../ratpack/NonCompliantPublisher.java | 46 ++++ .../ratpack/RatpackStreamsUnitTest.java | 140 ++++++++++++ 7 files changed, 673 insertions(+) create mode 100644 ratpack/src/main/java/com/baeldung/model/Quote.java create mode 100644 ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java create mode 100644 ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java diff --git a/ratpack/src/main/java/com/baeldung/model/Quote.java b/ratpack/src/main/java/com/baeldung/model/Quote.java new file mode 100644 index 0000000000..009a85fa11 --- /dev/null +++ b/ratpack/src/main/java/com/baeldung/model/Quote.java @@ -0,0 +1,107 @@ +package com.baeldung.model; + +import java.time.Instant; + +public class Quote { + + private Instant ts; + private String symbol; + private double value; + + public Quote() {} + + + public Quote(Instant ts, String symbol, double value) { + this.ts = ts; + this.symbol = symbol; + this.value = value; + } + + + /** + * @return the ts + */ + public Instant getTs() { + return ts; + } + + /** + * @param ts the ts to set + */ + public void setTs(Instant ts) { + this.ts = ts; + } + + /** + * @return the symbol + */ + public String getSymbol() { + return symbol; + } + + /** + * @param symbol the symbol to set + */ + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + /** + * @return the value + */ + public double getValue() { + return value; + } + + /** + * @param value the value to set + */ + public void setValue(double value) { + this.value = value; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((symbol == null) ? 0 : symbol.hashCode()); + result = prime * result + ((ts == null) ? 0 : ts.hashCode()); + long temp; + temp = Double.doubleToLongBits(value); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Quote other = (Quote) obj; + if (symbol == null) { + if (other.symbol != null) + return false; + } else if (!symbol.equals(other.symbol)) + return false; + if (ts == null) { + if (other.ts != null) + return false; + } else if (!ts.equals(other.ts)) + return false; + if (Double.doubleToLongBits(value) != Double.doubleToLongBits(other.value)) + return false; + return true; + } + + + @Override + public String toString() { + return "Quote [ts=" + ts + ", symbol=" + symbol + ", value=" + value + "]"; + } + + +} diff --git a/ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java b/ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java new file mode 100644 index 0000000000..7c073ee1de --- /dev/null +++ b/ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java @@ -0,0 +1,44 @@ +package com.baeldung.rxjava.service; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; + +import org.reactivestreams.Publisher; + +import com.baeldung.model.Quote; + +import ratpack.stream.Streams; + +public class QuotesService { + + private final ScheduledExecutorService executorService; + private static Random rnd = new Random(); + private static String[] symbols = new String[] { + "MSFT", + "ORCL", + "GOOG", + "AAPL", + "CSCO" + }; + + public QuotesService(ScheduledExecutorService executorService) { + this.executorService = executorService; + } + + public Publisher newTicker() { + return Streams.periodically(executorService, Duration.ofSeconds(2), (t) -> { + + return randomQuote(); + }); + } + + private static Quote randomQuote() { + return new Quote ( + Instant.now(), + symbols[rnd.nextInt(symbols.length)], + Math.round(rnd.nextDouble()*100) + ); + } +} diff --git a/ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java b/ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java new file mode 100644 index 0000000000..dc66efbecb --- /dev/null +++ b/ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java @@ -0,0 +1,206 @@ +package com.baeldung.spring; + +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import com.baeldung.model.Quote; +import com.baeldung.rxjava.service.QuotesService; + +import groovy.util.logging.Slf4j; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import ratpack.func.Action; +import ratpack.handling.Chain; +import ratpack.http.ResponseChunks; +import ratpack.http.Status; +import ratpack.server.ServerConfig; +import ratpack.spring.config.EnableRatpack; +import ratpack.sse.ServerSentEvents; +import ratpack.stream.Streams; +import ratpack.stream.TransformablePublisher; +import ratpack.websocket.WebSockets; +import rx.subscriptions.Subscriptions; + +/** + * @author psevestre + */ +@SpringBootApplication +@EnableRatpack +public class EmbedRatpackStreamsApp { + + private static final Logger log = LoggerFactory.getLogger(EmbedRatpackStreamsApp.class); + + @Autowired + private QuotesService quotesService; + + private AtomicLong idSeq = new AtomicLong(0); + + + @Bean + public ScheduledExecutorService executorService() { + return Executors.newScheduledThreadPool(1); + } + + @Bean + public QuotesService quotesService(ScheduledExecutorService executor) { + return new QuotesService(executor); + } + + @Bean + public Action quotes() { + ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> { + evt + .id(Long.toString(idSeq.incrementAndGet())) + .event("quote") + .data( q -> q.toString()); + }); + + return chain -> chain.get("quotes", ctx -> ctx.render(sse)); + } + + @Bean + public Action quotesWS() { + Publisher pub = Streams.transformable(quotesService.newTicker()) + .map(Quote::toString); + return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub)); + } + + @Bean + public Action uploadFile() { + + return chain -> chain.post("upload", ctx -> { + TransformablePublisher pub = ctx.getRequest().getBodyStream(); + pub.subscribe(new Subscriber() { + private Subscription sub; + @Override + public void onSubscribe(Subscription sub) { + this.sub = sub; + sub.request(1); + } + + @Override + public void onNext(ByteBuf t) { + try { + int len = t.readableBytes(); + log.info("Got {} bytes", len); + + // Do something useful with data + + // Request next chunk + sub.request(1); + } + finally { + // DO NOT FORGET to RELEASE ! + t.release(); + } + } + + @Override + public void onError(Throwable t) { + ctx.getResponse().status(500); + } + + @Override + public void onComplete() { + ctx.getResponse().status(202); + } + }); + }); + } + + @Bean + public Action download() { + return chain -> chain.get("download", ctx -> { + ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512)); + }); + } + + @Bean + public Action downloadChunks() { + return chain -> chain.get("downloadChunks", ctx -> { + ctx.render(ResponseChunks.bufferChunks("application/octetstream", + new RandomBytesPublisher(1024,512))); + }); + } + + @Bean + public ServerConfig ratpackServerConfig() { + return ServerConfig + .builder() + .findBaseDir("public") + .build(); + } + + public static void main(String[] args) { + SpringApplication.run(EmbedRatpackStreamsApp.class, args); + } + + + public static class RandomBytesPublisher implements Publisher { + + private int bufCount; + private int bufSize; + private Random rnd = new Random(); + + + RandomBytesPublisher(int bufCount, int bufSize) { + this.bufCount = bufCount; + this.bufSize = bufSize; + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + + private boolean cancelled = false; + private boolean recurse; + private long requested = 0; + + @Override + public void request(long n) { + if ( bufCount == 0 ) { + s.onComplete(); + return; + } + + requested += n; + if ( recurse ) { + return; + } + + recurse = true; + try { + while ( requested-- > 0 && !cancelled && bufCount-- > 0 ) { + byte[] data = new byte[bufSize]; + rnd.nextBytes(data); + ByteBuf buf = Unpooled.wrappedBuffer(data); + s.onNext(buf); + } + } + finally { + recurse = false; + } + } + + @Override + public void cancel() { + cancelled = true; + } + }); + + } + } + +} diff --git a/ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java b/ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java new file mode 100644 index 0000000000..5526a630ff --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java @@ -0,0 +1,63 @@ +package com.baeldung.ratpack; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +// Non-thread safe !!! +class CompliantPublisher implements Publisher { + String name; + + private static final Logger log = LoggerFactory.getLogger(CompliantPublisher.class); + private long available; + + + public CompliantPublisher(long available) { + this.available = available; + } + + @Override + public void subscribe(Subscriber subscriber) { + log.info("subscribe"); + subscriber.onSubscribe(new CompliantSubscription(subscriber)); + + } + + + private class CompliantSubscription implements Subscription { + + private Subscriber subscriber; + private int recurseLevel; + private long requested; + private boolean cancelled; + + public CompliantSubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + log.info("request: requested={}, available={}", n, available); + requested += n; + if ( recurseLevel > 0 ) { + return; + } + + recurseLevel++; + for (int i = 0 ; i < (requested) && !cancelled && available > 0 ; i++, available-- ) { + subscriber.onNext(i); + } + subscriber.onComplete(); + } + + @Override + public void cancel() { + cancelled = true; + } + + } + +} \ No newline at end of file diff --git a/ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java b/ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java new file mode 100644 index 0000000000..0d58b7c05e --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java @@ -0,0 +1,67 @@ +package com.baeldung.ratpack; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LoggingSubscriber implements Subscriber { + private static final Logger log = LoggerFactory.getLogger(LoggingSubscriber.class); + + private Subscription subscription; + private long requested; + private long received; + private CountDownLatch finished = new CountDownLatch(1); + + @Override + public void onComplete() { + log.info("onComplete: sub={}", subscription.hashCode()); + finished.countDown(); + } + + @Override + public void onError(Throwable t) { + log.error("Error: sub={}, message={}", subscription.hashCode(), t.getMessage(),t); + finished.countDown(); + } + + @Override + public void onNext(T value) { + log.info("onNext: sub={}, value={}", subscription.hashCode(), value); + this.received++; + this.requested++; + subscription.request(1); + } + + @Override + public void onSubscribe(Subscription sub) { + log.info("onSubscribe: sub={}", sub.hashCode()); + this.subscription = sub; + this.received = 0; + this.requested = 1; + sub.request(1); + } + + + public long getRequested() { + return requested; + } + + public long getReceived() { + return received; + } + + public void block() { + try { + finished.await(10, TimeUnit.SECONDS); + } + catch(InterruptedException iex) { + throw new RuntimeException(iex); + } + } + +} diff --git a/ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java b/ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java new file mode 100644 index 0000000000..03b94d429d --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java @@ -0,0 +1,46 @@ +package com.baeldung.ratpack; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class NonCompliantPublisher implements Publisher { + + private static final Logger log = LoggerFactory.getLogger(NonCompliantPublisher.class); + + @Override + public void subscribe(Subscriber subscriber) { + log.info("subscribe"); + subscriber.onSubscribe(new NonCompliantSubscription(subscriber)); + } + + private class NonCompliantSubscription implements Subscription { + private Subscriber subscriber; + private int recurseLevel = 0; + + public NonCompliantSubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + log.info("request: n={}", n); + if ( recurseLevel > 0 ) { + return; + } + + recurseLevel++; + for (int i = 0 ; i < (n + 5) ; i ++ ) { + subscriber.onNext(i); + } + subscriber.onComplete(); + } + + @Override + public void cancel() { + } + } +} \ No newline at end of file diff --git a/ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java b/ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java new file mode 100644 index 0000000000..54cc71c328 --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java @@ -0,0 +1,140 @@ +package com.baeldung.ratpack; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ratpack.exec.ExecResult; +import ratpack.func.Action; +import ratpack.stream.StreamEvent; +import ratpack.stream.Streams; +import ratpack.stream.TransformablePublisher; +import ratpack.test.exec.ExecHarness; + +public class RatpackStreamsUnitTest { + + private static Logger log = LoggerFactory.getLogger(RatpackStreamsUnitTest.class); + + @Test + public void whenPublish_thenSuccess() { + + Publisher pub = Streams.publish(Arrays.asList("hello", "hello again")); + LoggingSubscriber sub = new LoggingSubscriber(); + pub.subscribe(sub); + sub.block(); + } + + + @Test + public void whenYield_thenSuccess() { + + Publisher pub = Streams.yield((t) -> { + return t.getRequestNum() < 5 ? "hello" : null; + }); + + LoggingSubscriber sub = new LoggingSubscriber(); + pub.subscribe(sub); + sub.block(); + assertEquals(5, sub.getReceived()); + } + + @Test + public void whenPeriodic_thenSuccess() { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + Publisher pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> { + return t < 5 ? String.format("hello %d",t): null; + }); + + LoggingSubscriber sub = new LoggingSubscriber(); + pub.subscribe(sub); + sub.block(); + assertEquals(5, sub.getReceived()); + } + + @Test + public void whenMap_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.yield( t -> { + return t.getRequestNum() < 5 ? t.getRequestNum() : null; + }) + .map(v -> String.format("item %d", v)); + + ExecResult> result = ExecHarness.yieldSingle((c) -> pub.toList() ); + assertTrue("should succeed", result.isSuccess()); + assertEquals("should have 5 items",5,result.getValue().size()); + } + + @Test + public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.transformable(new NonCompliantPublisher()) + .wiretap(new LoggingAction("before buffer")) + .buffer() + .wiretap(new LoggingAction("after buffer")) + .take(1); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); + } + + @Test + public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception { + TransformablePublisher pub = Streams.transformable(new NonCompliantPublisher()) + .wiretap(new LoggingAction("")) + .take(1); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); + } + +@Test +public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.transformable(new CompliantPublisher(10)) + .wiretap(new LoggingAction("")); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); +} + +@Test +public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.transformable(new CompliantPublisher(10)) + .wiretap(new LoggingAction("before batch")) + .batch(5, Action.noop()) + .wiretap(new LoggingAction("after batch")); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); +} + + private static class LoggingAction implements Action>{ + private final String label; + + public LoggingAction(String label) { + this.label = label; + } + + @Override + public void execute(StreamEvent e) throws Exception { + log.info("{}: event={}", label,e); + } + + } + +}