Bael 776 vert.x and rxjava (#2529)
* Burlap & Hessian server added * Burlap & Hessian client work * Fixed main * Fixed formatting * Spring Remote example based on Burlap & Hessian runs in a JUnit test * Fixed main * Fixed formatting * Spring Remote example based on Burlap & Hessian runs in a JUnit test * Spring Remote example based on Burlap & Hessian runs in a JUnit test * Burlap & Hessian client work * Fixed main * Fixed main * Fixed formatting * Fixed formatting * Spring Remote example based on Burlap & Hessian runs in a JUnit test * Spring Remote example based on Burlap & Hessian runs in a JUnit test * Fixed POM * Vertx / RxJava example * BAEL-776 - Vertx and RxJava * BAEL-776 - Vertx and RxJava
This commit is contained in:
parent
7df2635822
commit
4c7f165c7e
|
@ -41,3 +41,4 @@ SpringDataInjectionDemo/.mvn/wrapper/maven-wrapper.properties
|
|||
spring-call-getters-using-reflection/.mvn/wrapper/maven-wrapper.properties
|
||||
|
||||
spring-check-if-a-property-is-null/.mvn/wrapper/maven-wrapper.properties
|
||||
/vertx-and-rxjava/.vertx/
|
||||
|
|
1
pom.xml
1
pom.xml
|
@ -240,6 +240,7 @@
|
|||
<module>spring-boot-property-exp</module>
|
||||
<module>mockserver</module>
|
||||
<module>undertow</module>
|
||||
<module>vertx-and-rxjava</module>
|
||||
</modules>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>vertx-and-rxjava</artifactId>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-rx-java2</artifactId>
|
||||
<version>3.5.0.Beta1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-core</artifactId>
|
||||
<version>3.5.0.Beta1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-unit</artifactId>
|
||||
<version>3.5.0.Beta1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.25</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,71 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>vertx-and-rxjava</artifactId>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-rx-java2</artifactId>
|
||||
<version>3.5.0.Beta1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-core</artifactId>
|
||||
<version>3.5.0.Beta1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-unit</artifactId>
|
||||
<version>3.5.0.Beta1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.25</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,18 @@
|
|||
package com.baeldung;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
|
||||
class CityAndDayLength {
|
||||
|
||||
private final String city;
|
||||
private final double dayLengthInHours;
|
||||
|
||||
public CityAndDayLength(String city, long dayLengthInSeconds) {
|
||||
this.city = city;
|
||||
this.dayLengthInHours = dayLengthInSeconds / (60.0 * 60.0);
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return MessageFormat.format("In {0} there are {1,number,#0.0} hours of light.", city, dayLengthInHours);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.baeldung;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import io.vertx.core.http.RequestOptions;
|
||||
import io.vertx.reactivex.core.http.HttpClient;
|
||||
import io.vertx.reactivex.core.http.HttpClientRequest;
|
||||
import io.vertx.reactivex.core.http.HttpClientResponse;
|
||||
|
||||
import static java.lang.String.format;
|
||||
|
||||
public class MetaWeatherClient {
|
||||
|
||||
private static RequestOptions metawether = new RequestOptions()
|
||||
.setHost("www.metaweather.com")
|
||||
.setPort(443)
|
||||
.setSsl(true);
|
||||
|
||||
/**
|
||||
* @return A flowable backed by vertx that automatically sends an HTTP request at soon as the first subscription is received.
|
||||
*/
|
||||
private static Flowable<HttpClientResponse> autoPerformingReq(HttpClient httpClient, String uri) {
|
||||
HttpClientRequest req = httpClient.get(new RequestOptions(metawether).setURI(uri));
|
||||
return req.toFlowable()
|
||||
.doOnSubscribe(subscription -> req.end());
|
||||
}
|
||||
|
||||
static Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
|
||||
HttpClientRequest req = httpClient.get(
|
||||
new RequestOptions()
|
||||
.setHost("www.metaweather.com")
|
||||
.setPort(443)
|
||||
.setSsl(true)
|
||||
.setURI(format("/api/location/search/?query=%s", cityName)));
|
||||
return req
|
||||
.toFlowable()
|
||||
.doOnSubscribe(subscription -> req.end());
|
||||
}
|
||||
|
||||
static Flowable<HttpClientResponse> getDataByPlaceId(HttpClient httpClient, long placeId) {
|
||||
return autoPerformingReq(
|
||||
httpClient,
|
||||
format("/api/location/%s/", placeId));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package com.baeldung;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import io.vertx.reactivex.core.Vertx;
|
||||
import io.vertx.reactivex.core.buffer.Buffer;
|
||||
import io.vertx.reactivex.core.file.FileSystem;
|
||||
import io.vertx.reactivex.core.http.HttpClient;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
import static com.baeldung.MetaWeatherClient.getDataByPlaceId;
|
||||
import static com.baeldung.MetaWeatherClient.searchByCityName;
|
||||
|
||||
public class VertxWithRxJavaTest {
|
||||
|
||||
private Vertx vertx;
|
||||
private HttpClient httpClient;
|
||||
private FileSystem fileSystem;
|
||||
static Logger log = LoggerFactory.getLogger(VertxWithRxJavaTest.class);
|
||||
|
||||
@Before public void setUp() {
|
||||
vertx = io.vertx.reactivex.core.Vertx.vertx();
|
||||
httpClient = vertx.createHttpClient();
|
||||
fileSystem = vertx.fileSystem();
|
||||
}
|
||||
|
||||
@After public void tearDown() {
|
||||
vertx.close();
|
||||
}
|
||||
|
||||
@Test public void lightLengthTest() throws InterruptedException {
|
||||
|
||||
// read the file that contains one city name per line
|
||||
fileSystem
|
||||
.rxReadFile("cities.txt").toFlowable()
|
||||
.doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer))
|
||||
|
||||
// split file content in lines to obtain one city per line
|
||||
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
|
||||
.doOnNext(city -> log.info("City from file: '{}'", city))
|
||||
|
||||
// discard cities that are commented out with a leading '#'
|
||||
.filter(city -> !city.startsWith("#"))
|
||||
.doOnNext(city -> log.info("City that survived filtering: '{}'", city))
|
||||
|
||||
// for each city sends a request to obtain its 'woeid'
|
||||
// and collect the buffer from the answer
|
||||
.flatMap(city -> searchByCityName(httpClient, city) )
|
||||
.flatMap(response -> response.toFlowable())
|
||||
.doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer))
|
||||
|
||||
// get the woeid of each city
|
||||
.map(cityBuffer -> cityBuffer
|
||||
.toJsonArray()
|
||||
.getJsonObject(0)
|
||||
.getLong("woeid"))
|
||||
|
||||
// use the id to ask for data
|
||||
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
|
||||
.flatMap(response -> response
|
||||
.toObservable()
|
||||
.reduce(
|
||||
Buffer.buffer(),
|
||||
(total, newBuf) -> total.appendBuffer( newBuf )).toFlowable() )
|
||||
|
||||
// get the JSON object out of the response
|
||||
.doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer))
|
||||
.map(buffer -> buffer.toJsonObject())
|
||||
|
||||
// map the JSON in a POJO
|
||||
.map(json -> {
|
||||
ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
|
||||
ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
|
||||
String cityName = json.getString("title");
|
||||
return new CityAndDayLength(
|
||||
cityName,sunSet.toEpochSecond() - sunRise.toEpochSecond());
|
||||
})
|
||||
|
||||
// consume the events
|
||||
.subscribe(
|
||||
cityAndDayLength -> System.out.println(cityAndDayLength),
|
||||
ex -> ex.printStackTrace());
|
||||
|
||||
// enough to give time to complete the execution
|
||||
Thread.sleep(20000);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
Milan
|
||||
Chicago
|
||||
Cairo
|
||||
Santiago
|
||||
Moscow
|
||||
Auckland
|
|
@ -0,0 +1,12 @@
|
|||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>[%thread{32}] %-5level %msg - %logger%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="warn">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
|
||||
</configuration>
|
Loading…
Reference in New Issue