From 78e47f104fb65a40a374b76866f0c9a689ec6361 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Thu, 31 Aug 2017 15:03:21 +0200 Subject: [PATCH] RxJava Vertx refactor (#2531) * Refactor * Refactor --- vertx-and-rxjava/pom.xml | 1 - .../com/baeldung/VertxWithRxJavaTest.java | 94 ------------------- .../{ => weather}/CityAndDayLength.java | 7 +- .../{ => weather}/MetaWeatherClient.java | 38 ++++---- .../baeldung/weather/VertxWithRxJavaTest.java | 92 ++++++++++++++++++ 5 files changed, 115 insertions(+), 117 deletions(-) delete mode 100644 vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java rename vertx-and-rxjava/src/test/java/com/baeldung/{ => weather}/CityAndDayLength.java (72%) rename vertx-and-rxjava/src/test/java/com/baeldung/{ => weather}/MetaWeatherClient.java (50%) create mode 100644 vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java diff --git a/vertx-and-rxjava/pom.xml b/vertx-and-rxjava/pom.xml index 47510e60f7..9c2c9bfd48 100644 --- a/vertx-and-rxjava/pom.xml +++ b/vertx-and-rxjava/pom.xml @@ -10,7 +10,6 @@ 4.0.0 - com.baeldung vertx-and-rxjava diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java b/vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java deleted file mode 100644 index f02792c3d7..0000000000 --- a/vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.baeldung; - -import io.reactivex.Flowable; -import io.vertx.reactivex.core.Vertx; -import io.vertx.reactivex.core.buffer.Buffer; -import io.vertx.reactivex.core.file.FileSystem; -import io.vertx.reactivex.core.http.HttpClient; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.ZonedDateTime; - -import static com.baeldung.MetaWeatherClient.getDataByPlaceId; -import static com.baeldung.MetaWeatherClient.searchByCityName; - -public class VertxWithRxJavaTest { - - private Vertx vertx; - private HttpClient httpClient; - private FileSystem fileSystem; - static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class); - - @Before public void setUp() { - vertx = io.vertx.reactivex.core.Vertx.vertx(); - httpClient = vertx.createHttpClient(); - fileSystem = vertx.fileSystem(); - } - - @After public void tearDown() { - vertx.close(); - } - - @Test public void lightLengthTest() throws InterruptedException { - - // read the file that contains one city name per line - fileSystem - .rxReadFile("cities.txt").toFlowable() - .doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer)) - - // split file content in lines to obtain one city per line - .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n"))) - .doOnNext(city -> log.info("City from file: '{}'", city)) - - // discard cities that are commented out with a leading '#' - .filter(city -> !city.startsWith("#")) - .doOnNext(city -> log.info("City that survived filtering: '{}'", city)) - - // for each city sends a request to obtain its 'woeid' - // and collect the buffer from the answer - .flatMap(city -> searchByCityName(httpClient, city) ) - .flatMap(response -> response.toFlowable()) - .doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer)) - - // get the woeid of each city - .map(cityBuffer -> cityBuffer - .toJsonArray() - .getJsonObject(0) - .getLong("woeid")) - - // use the id to ask for data - .flatMap(cityId -> getDataByPlaceId(httpClient, cityId)) - .flatMap(response -> response - .toObservable() - .reduce( - Buffer.buffer(), - (total, newBuf) -> total.appendBuffer( newBuf )).toFlowable() ) - - // get the JSON object out of the response - .doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer)) - .map(buffer -> buffer.toJsonObject()) - - // map the JSON in a POJO - .map(json -> { - ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise")); - ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set")); - String cityName = json.getString("title"); - return new CityAndDayLength( - cityName,sunSet.toEpochSecond() - sunRise.toEpochSecond()); - }) - - // consume the events - .subscribe( - cityAndDayLength -> System.out.println(cityAndDayLength), - ex -> ex.printStackTrace()); - - // enough to give time to complete the execution - Thread.sleep(20000); - - } - -} diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/CityAndDayLength.java b/vertx-and-rxjava/src/test/java/com/baeldung/weather/CityAndDayLength.java similarity index 72% rename from vertx-and-rxjava/src/test/java/com/baeldung/CityAndDayLength.java rename to vertx-and-rxjava/src/test/java/com/baeldung/weather/CityAndDayLength.java index dec4aa4852..f725c8255e 100644 --- a/vertx-and-rxjava/src/test/java/com/baeldung/CityAndDayLength.java +++ b/vertx-and-rxjava/src/test/java/com/baeldung/weather/CityAndDayLength.java @@ -1,4 +1,4 @@ -package com.baeldung; +package com.baeldung.weather; import java.text.MessageFormat; @@ -7,12 +7,13 @@ class CityAndDayLength { private final String city; private final double dayLengthInHours; - public CityAndDayLength(String city, long dayLengthInSeconds) { + CityAndDayLength(String city, long dayLengthInSeconds) { this.city = city; this.dayLengthInHours = dayLengthInSeconds / (60.0 * 60.0); } - @Override public String toString() { + @Override + public String toString() { return MessageFormat.format("In {0} there are {1,number,#0.0} hours of light.", city, dayLengthInHours); } } diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/MetaWeatherClient.java b/vertx-and-rxjava/src/test/java/com/baeldung/weather/MetaWeatherClient.java similarity index 50% rename from vertx-and-rxjava/src/test/java/com/baeldung/MetaWeatherClient.java rename to vertx-and-rxjava/src/test/java/com/baeldung/weather/MetaWeatherClient.java index 0a3e9d8713..0dc1f8ac95 100644 --- a/vertx-and-rxjava/src/test/java/com/baeldung/MetaWeatherClient.java +++ b/vertx-and-rxjava/src/test/java/com/baeldung/weather/MetaWeatherClient.java @@ -1,4 +1,4 @@ -package com.baeldung; +package com.baeldung.weather; import io.reactivex.Flowable; import io.vertx.core.http.RequestOptions; @@ -8,38 +8,38 @@ import io.vertx.reactivex.core.http.HttpClientResponse; import static java.lang.String.format; -public class MetaWeatherClient { +class MetaWeatherClient { private static RequestOptions metawether = new RequestOptions() - .setHost("www.metaweather.com") - .setPort(443) - .setSsl(true); + .setHost("www.metaweather.com") + .setPort(443) + .setSsl(true); /** * @return A flowable backed by vertx that automatically sends an HTTP request at soon as the first subscription is received. */ -private static Flowable autoPerformingReq(HttpClient httpClient, String uri) { - HttpClientRequest req = httpClient.get(new RequestOptions(metawether).setURI(uri)); - return req.toFlowable() - .doOnSubscribe(subscription -> req.end()); -} + private static Flowable autoPerformingReq(HttpClient httpClient, String uri) { + HttpClientRequest req = httpClient.get(new RequestOptions(metawether).setURI(uri)); + return req.toFlowable() + .doOnSubscribe(subscription -> req.end()); + } -static Flowable searchByCityName(HttpClient httpClient, String cityName) { - HttpClientRequest req = httpClient.get( - new RequestOptions() + static Flowable searchByCityName(HttpClient httpClient, String cityName) { + HttpClientRequest req = httpClient.get( + new RequestOptions() .setHost("www.metaweather.com") .setPort(443) .setSsl(true) .setURI(format("/api/location/search/?query=%s", cityName))); - return req - .toFlowable() - .doOnSubscribe(subscription -> req.end()); -} + return req + .toFlowable() + .doOnSubscribe(subscription -> req.end()); + } static Flowable getDataByPlaceId(HttpClient httpClient, long placeId) { return autoPerformingReq( - httpClient, - format("/api/location/%s/", placeId)); + httpClient, + format("/api/location/%s/", placeId)); } } diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java b/vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java new file mode 100644 index 0000000000..ee5a24c13f --- /dev/null +++ b/vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java @@ -0,0 +1,92 @@ +package com.baeldung.weather; + +import io.reactivex.Flowable; +import io.reactivex.functions.Function; +import io.vertx.core.json.JsonObject; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.core.file.FileSystem; +import io.vertx.reactivex.core.http.HttpClient; +import io.vertx.reactivex.core.http.HttpClientResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZonedDateTime; + +import static com.baeldung.weather.MetaWeatherClient.getDataByPlaceId; +import static com.baeldung.weather.MetaWeatherClient.searchByCityName; + +public class VertxWithRxJavaTest { + + private Vertx vertx; + private HttpClient httpClient; + private FileSystem fileSystem; + private static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class); + + @Before + public void setUp() { + vertx = io.vertx.reactivex.core.Vertx.vertx(); + httpClient = vertx.createHttpClient(); + fileSystem = vertx.fileSystem(); + } + + @After + public void tearDown() { + vertx.close(); + } + + @Test + public void shouldDisplayLightLenghts() throws InterruptedException { + + // read the file that contains one city name per line + fileSystem + .rxReadFile("cities.txt").toFlowable() + .doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer)) + .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n"))) + .doOnNext(city -> log.info("City from file: '{}'", city)) + .filter(city -> !city.startsWith("#")) + .doOnNext(city -> log.info("City that survived filtering: '{}'", city)) + .flatMap(city -> searchByCityName(httpClient, city)) + .flatMap(HttpClientResponse::toFlowable) + .doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer)) + .map(extractingWoeid()) + .flatMap(cityId -> getDataByPlaceId(httpClient, cityId)) + .flatMap(toBufferFlowable()) + .doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer)) + .map(Buffer::toJsonObject) + .map(toCityAndDayLength()) + .subscribe(System.out::println, Throwable::printStackTrace); + + Thread.sleep(20000); // enough to give time to complete the execution + } + + private static Function> toBufferFlowable() { + return response -> response + .toObservable() + .reduce( + Buffer.buffer(), + Buffer::appendBuffer).toFlowable(); + } + + private static Function extractingWoeid() { + return cityBuffer -> cityBuffer + .toJsonArray() + .getJsonObject(0) + .getLong("woeid"); + } + + private static Function toCityAndDayLength() { + return json -> { + ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise")); + ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set")); + String cityName = json.getString("title"); + return new CityAndDayLength( + cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond()); + }; + } + +}