diff --git a/vertx-and-rxjava/pom.xml b/vertx-and-rxjava/pom.xml
index 47510e60f7..9c2c9bfd48 100644
--- a/vertx-and-rxjava/pom.xml
+++ b/vertx-and-rxjava/pom.xml
@@ -10,7 +10,6 @@
4.0.0
- com.baeldung
vertx-and-rxjava
diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java b/vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java
deleted file mode 100644
index f02792c3d7..0000000000
--- a/vertx-and-rxjava/src/test/java/com/baeldung/VertxWithRxJavaTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.baeldung;
-
-import io.reactivex.Flowable;
-import io.vertx.reactivex.core.Vertx;
-import io.vertx.reactivex.core.buffer.Buffer;
-import io.vertx.reactivex.core.file.FileSystem;
-import io.vertx.reactivex.core.http.HttpClient;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.ZonedDateTime;
-
-import static com.baeldung.MetaWeatherClient.getDataByPlaceId;
-import static com.baeldung.MetaWeatherClient.searchByCityName;
-
-public class VertxWithRxJavaTest {
-
- private Vertx vertx;
- private HttpClient httpClient;
- private FileSystem fileSystem;
- static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class);
-
- @Before public void setUp() {
- vertx = io.vertx.reactivex.core.Vertx.vertx();
- httpClient = vertx.createHttpClient();
- fileSystem = vertx.fileSystem();
- }
-
- @After public void tearDown() {
- vertx.close();
- }
-
- @Test public void lightLengthTest() throws InterruptedException {
-
- // read the file that contains one city name per line
- fileSystem
- .rxReadFile("cities.txt").toFlowable()
- .doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer))
-
- // split file content in lines to obtain one city per line
- .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
- .doOnNext(city -> log.info("City from file: '{}'", city))
-
- // discard cities that are commented out with a leading '#'
- .filter(city -> !city.startsWith("#"))
- .doOnNext(city -> log.info("City that survived filtering: '{}'", city))
-
- // for each city sends a request to obtain its 'woeid'
- // and collect the buffer from the answer
- .flatMap(city -> searchByCityName(httpClient, city) )
- .flatMap(response -> response.toFlowable())
- .doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer))
-
- // get the woeid of each city
- .map(cityBuffer -> cityBuffer
- .toJsonArray()
- .getJsonObject(0)
- .getLong("woeid"))
-
- // use the id to ask for data
- .flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
- .flatMap(response -> response
- .toObservable()
- .reduce(
- Buffer.buffer(),
- (total, newBuf) -> total.appendBuffer( newBuf )).toFlowable() )
-
- // get the JSON object out of the response
- .doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer))
- .map(buffer -> buffer.toJsonObject())
-
- // map the JSON in a POJO
- .map(json -> {
- ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
- ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
- String cityName = json.getString("title");
- return new CityAndDayLength(
- cityName,sunSet.toEpochSecond() - sunRise.toEpochSecond());
- })
-
- // consume the events
- .subscribe(
- cityAndDayLength -> System.out.println(cityAndDayLength),
- ex -> ex.printStackTrace());
-
- // enough to give time to complete the execution
- Thread.sleep(20000);
-
- }
-
-}
diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/CityAndDayLength.java b/vertx-and-rxjava/src/test/java/com/baeldung/weather/CityAndDayLength.java
similarity index 72%
rename from vertx-and-rxjava/src/test/java/com/baeldung/CityAndDayLength.java
rename to vertx-and-rxjava/src/test/java/com/baeldung/weather/CityAndDayLength.java
index dec4aa4852..f725c8255e 100644
--- a/vertx-and-rxjava/src/test/java/com/baeldung/CityAndDayLength.java
+++ b/vertx-and-rxjava/src/test/java/com/baeldung/weather/CityAndDayLength.java
@@ -1,4 +1,4 @@
-package com.baeldung;
+package com.baeldung.weather;
import java.text.MessageFormat;
@@ -7,12 +7,13 @@ class CityAndDayLength {
private final String city;
private final double dayLengthInHours;
- public CityAndDayLength(String city, long dayLengthInSeconds) {
+ CityAndDayLength(String city, long dayLengthInSeconds) {
this.city = city;
this.dayLengthInHours = dayLengthInSeconds / (60.0 * 60.0);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return MessageFormat.format("In {0} there are {1,number,#0.0} hours of light.", city, dayLengthInHours);
}
}
diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/MetaWeatherClient.java b/vertx-and-rxjava/src/test/java/com/baeldung/weather/MetaWeatherClient.java
similarity index 50%
rename from vertx-and-rxjava/src/test/java/com/baeldung/MetaWeatherClient.java
rename to vertx-and-rxjava/src/test/java/com/baeldung/weather/MetaWeatherClient.java
index 0a3e9d8713..0dc1f8ac95 100644
--- a/vertx-and-rxjava/src/test/java/com/baeldung/MetaWeatherClient.java
+++ b/vertx-and-rxjava/src/test/java/com/baeldung/weather/MetaWeatherClient.java
@@ -1,4 +1,4 @@
-package com.baeldung;
+package com.baeldung.weather;
import io.reactivex.Flowable;
import io.vertx.core.http.RequestOptions;
@@ -8,38 +8,38 @@ import io.vertx.reactivex.core.http.HttpClientResponse;
import static java.lang.String.format;
-public class MetaWeatherClient {
+class MetaWeatherClient {
private static RequestOptions metawether = new RequestOptions()
- .setHost("www.metaweather.com")
- .setPort(443)
- .setSsl(true);
+ .setHost("www.metaweather.com")
+ .setPort(443)
+ .setSsl(true);
/**
* @return A flowable backed by vertx that automatically sends an HTTP request at soon as the first subscription is received.
*/
-private static Flowable autoPerformingReq(HttpClient httpClient, String uri) {
- HttpClientRequest req = httpClient.get(new RequestOptions(metawether).setURI(uri));
- return req.toFlowable()
- .doOnSubscribe(subscription -> req.end());
-}
+ private static Flowable autoPerformingReq(HttpClient httpClient, String uri) {
+ HttpClientRequest req = httpClient.get(new RequestOptions(metawether).setURI(uri));
+ return req.toFlowable()
+ .doOnSubscribe(subscription -> req.end());
+ }
-static Flowable searchByCityName(HttpClient httpClient, String cityName) {
- HttpClientRequest req = httpClient.get(
- new RequestOptions()
+ static Flowable searchByCityName(HttpClient httpClient, String cityName) {
+ HttpClientRequest req = httpClient.get(
+ new RequestOptions()
.setHost("www.metaweather.com")
.setPort(443)
.setSsl(true)
.setURI(format("/api/location/search/?query=%s", cityName)));
- return req
- .toFlowable()
- .doOnSubscribe(subscription -> req.end());
-}
+ return req
+ .toFlowable()
+ .doOnSubscribe(subscription -> req.end());
+ }
static Flowable getDataByPlaceId(HttpClient httpClient, long placeId) {
return autoPerformingReq(
- httpClient,
- format("/api/location/%s/", placeId));
+ httpClient,
+ format("/api/location/%s/", placeId));
}
}
diff --git a/vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java b/vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java
new file mode 100644
index 0000000000..ee5a24c13f
--- /dev/null
+++ b/vertx-and-rxjava/src/test/java/com/baeldung/weather/VertxWithRxJavaTest.java
@@ -0,0 +1,92 @@
+package com.baeldung.weather;
+
+import io.reactivex.Flowable;
+import io.reactivex.functions.Function;
+import io.vertx.core.json.JsonObject;
+import io.vertx.reactivex.core.Vertx;
+import io.vertx.reactivex.core.buffer.Buffer;
+import io.vertx.reactivex.core.file.FileSystem;
+import io.vertx.reactivex.core.http.HttpClient;
+import io.vertx.reactivex.core.http.HttpClientResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZonedDateTime;
+
+import static com.baeldung.weather.MetaWeatherClient.getDataByPlaceId;
+import static com.baeldung.weather.MetaWeatherClient.searchByCityName;
+
+public class VertxWithRxJavaTest {
+
+ private Vertx vertx;
+ private HttpClient httpClient;
+ private FileSystem fileSystem;
+ private static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class);
+
+ @Before
+ public void setUp() {
+ vertx = io.vertx.reactivex.core.Vertx.vertx();
+ httpClient = vertx.createHttpClient();
+ fileSystem = vertx.fileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ vertx.close();
+ }
+
+ @Test
+ public void shouldDisplayLightLenghts() throws InterruptedException {
+
+ // read the file that contains one city name per line
+ fileSystem
+ .rxReadFile("cities.txt").toFlowable()
+ .doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer))
+ .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
+ .doOnNext(city -> log.info("City from file: '{}'", city))
+ .filter(city -> !city.startsWith("#"))
+ .doOnNext(city -> log.info("City that survived filtering: '{}'", city))
+ .flatMap(city -> searchByCityName(httpClient, city))
+ .flatMap(HttpClientResponse::toFlowable)
+ .doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer))
+ .map(extractingWoeid())
+ .flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
+ .flatMap(toBufferFlowable())
+ .doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer))
+ .map(Buffer::toJsonObject)
+ .map(toCityAndDayLength())
+ .subscribe(System.out::println, Throwable::printStackTrace);
+
+ Thread.sleep(20000); // enough to give time to complete the execution
+ }
+
+ private static Function> toBufferFlowable() {
+ return response -> response
+ .toObservable()
+ .reduce(
+ Buffer.buffer(),
+ Buffer::appendBuffer).toFlowable();
+ }
+
+ private static Function extractingWoeid() {
+ return cityBuffer -> cityBuffer
+ .toJsonArray()
+ .getJsonObject(0)
+ .getLong("woeid");
+ }
+
+ private static Function toCityAndDayLength() {
+ return json -> {
+ ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
+ ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
+ String cityName = json.getString("title");
+ return new CityAndDayLength(
+ cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
+ };
+ }
+
+}