RxJava Vertx refactor (#2531)

* Refactor

* Refactor
This commit is contained in:
Grzegorz Piwowarek 2017-08-31 15:03:21 +02:00 committed by GitHub
parent d1a1322bcc
commit 78e47f104f
5 changed files with 115 additions and 117 deletions

View File

@ -10,7 +10,6 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung</groupId>
<artifactId>vertx-and-rxjava</artifactId>
<build>

View File

@ -1,94 +0,0 @@
package com.baeldung;
import io.reactivex.Flowable;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.FileSystem;
import io.vertx.reactivex.core.http.HttpClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZonedDateTime;
import static com.baeldung.MetaWeatherClient.getDataByPlaceId;
import static com.baeldung.MetaWeatherClient.searchByCityName;
public class VertxWithRxJavaTest {
private Vertx vertx;
private HttpClient httpClient;
private FileSystem fileSystem;
static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class);
@Before public void setUp() {
vertx = io.vertx.reactivex.core.Vertx.vertx();
httpClient = vertx.createHttpClient();
fileSystem = vertx.fileSystem();
}
@After public void tearDown() {
vertx.close();
}
@Test public void lightLengthTest() throws InterruptedException {
// read the file that contains one city name per line
fileSystem
.rxReadFile("cities.txt").toFlowable()
.doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer))
// split file content in lines to obtain one city per line
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
.doOnNext(city -> log.info("City from file: '{}'", city))
// discard cities that are commented out with a leading '#'
.filter(city -> !city.startsWith("#"))
.doOnNext(city -> log.info("City that survived filtering: '{}'", city))
// for each city sends a request to obtain its 'woeid'
// and collect the buffer from the answer
.flatMap(city -> searchByCityName(httpClient, city) )
.flatMap(response -> response.toFlowable())
.doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer))
// get the woeid of each city
.map(cityBuffer -> cityBuffer
.toJsonArray()
.getJsonObject(0)
.getLong("woeid"))
// use the id to ask for data
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(response -> response
.toObservable()
.reduce(
Buffer.buffer(),
(total, newBuf) -> total.appendBuffer( newBuf )).toFlowable() )
// get the JSON object out of the response
.doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer))
.map(buffer -> buffer.toJsonObject())
// map the JSON in a POJO
.map(json -> {
ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
String cityName = json.getString("title");
return new CityAndDayLength(
cityName,sunSet.toEpochSecond() - sunRise.toEpochSecond());
})
// consume the events
.subscribe(
cityAndDayLength -> System.out.println(cityAndDayLength),
ex -> ex.printStackTrace());
// enough to give time to complete the execution
Thread.sleep(20000);
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung;
package com.baeldung.weather;
import java.text.MessageFormat;
@ -7,12 +7,13 @@ class CityAndDayLength {
private final String city;
private final double dayLengthInHours;
public CityAndDayLength(String city, long dayLengthInSeconds) {
CityAndDayLength(String city, long dayLengthInSeconds) {
this.city = city;
this.dayLengthInHours = dayLengthInSeconds / (60.0 * 60.0);
}
@Override public String toString() {
@Override
public String toString() {
return MessageFormat.format("In {0} there are {1,number,#0.0} hours of light.", city, dayLengthInHours);
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung;
package com.baeldung.weather;
import io.reactivex.Flowable;
import io.vertx.core.http.RequestOptions;
@ -8,7 +8,7 @@ import io.vertx.reactivex.core.http.HttpClientResponse;
import static java.lang.String.format;
public class MetaWeatherClient {
class MetaWeatherClient {
private static RequestOptions metawether = new RequestOptions()
.setHost("www.metaweather.com")

View File

@ -0,0 +1,92 @@
package com.baeldung.weather;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.FileSystem;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZonedDateTime;
import static com.baeldung.weather.MetaWeatherClient.getDataByPlaceId;
import static com.baeldung.weather.MetaWeatherClient.searchByCityName;
public class VertxWithRxJavaTest {
private Vertx vertx;
private HttpClient httpClient;
private FileSystem fileSystem;
private static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class);
@Before
public void setUp() {
vertx = io.vertx.reactivex.core.Vertx.vertx();
httpClient = vertx.createHttpClient();
fileSystem = vertx.fileSystem();
}
@After
public void tearDown() {
vertx.close();
}
@Test
public void shouldDisplayLightLenghts() throws InterruptedException {
// read the file that contains one city name per line
fileSystem
.rxReadFile("cities.txt").toFlowable()
.doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer))
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
.doOnNext(city -> log.info("City from file: '{}'", city))
.filter(city -> !city.startsWith("#"))
.doOnNext(city -> log.info("City that survived filtering: '{}'", city))
.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)
.doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer))
.map(extractingWoeid())
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())
.doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer))
.map(Buffer::toJsonObject)
.map(toCityAndDayLength())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(20000); // enough to give time to complete the execution
}
private static Function<HttpClientResponse, Publisher<? extends Buffer>> toBufferFlowable() {
return response -> response
.toObservable()
.reduce(
Buffer.buffer(),
Buffer::appendBuffer).toFlowable();
}
private static Function<Buffer, Long> extractingWoeid() {
return cityBuffer -> cityBuffer
.toJsonArray()
.getJsonObject(0)
.getLong("woeid");
}
private static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
return json -> {
ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
String cityName = json.getString("title");
return new CityAndDayLength(
cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
};
}
}