Merge pull request #9278 from sampada07/JAVA-1595-branch

JAVA-1595: Restore code for Jetty ReactiveStreams HTTP Client article
This commit is contained in:
Josh Cummings 2020-05-19 19:41:24 -06:00 committed by GitHub
commit 4f6bcaf9f6
8 changed files with 308 additions and 0 deletions

View File

@ -35,6 +35,37 @@
<version>${mockwebserver.version}</version>
<scope>test</scope>
</dependency>
<!-- Dependencies for Jetty ReactiveStreams HTTP Client -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-reactive-httpclient</artifactId>
<version>${jetty.httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.server.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava2.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>${spring.webflux.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive.stream.version}</version>
</dependency>
</dependencies>
<properties>
@ -42,6 +73,12 @@
<gson.version>2.8.5</gson.version>
<mockwebserver.version>3.14.2</mockwebserver.version>
<jackson.version>2.9.8</jackson.version>
<jetty.httpclient.version>1.0.3</jetty.httpclient.version>
<jetty.server.version>9.4.19.v20190610</jetty.server.version>
<rxjava2.version>2.2.11</rxjava2.version>
<spring.webflux.version>5.1.9.RELEASE</spring.webflux.version>
<reactive.stream.version>1.0.3</reactive.stream.version>
<reactor.version>3.2.12.RELEASE</reactor.version>
</properties>
</project>

View File

@ -0,0 +1,35 @@
package com.baeldung.jetty.httpclient;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
}
@Override
public void onNext(ReactiveResponse response) {
sink.offer(response);
}
@Override
public void onError(Throwable failure) {
}
@Override
public void onComplete() {
}
public ReactiveResponse block() throws InterruptedException {
return sink.poll(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,21 @@
package com.baeldung.jetty.httpclient;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
public class RequestHandler extends AbstractHandler {
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
jettyRequest.setHandled(true);
response.setContentType(request.getContentType());
IO.copy(request.getInputStream(), response.getOutputStream());
}
}

View File

@ -0,0 +1,54 @@
package com.baeldung.jetty.httpclient;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.junit.After;
import org.junit.Before;
public abstract class AbstractUnitTest {
protected HttpClient httpClient;
protected Server server;
protected static final String CONTENT = "Hello World!";
protected final int port = 9080;
@Before
public void init() {
startServer(new RequestHandler());
startClient();
}
private void startClient() {
httpClient = new HttpClient();
try {
httpClient.start();
} catch (Exception e) {
e.printStackTrace();
}
}
private void startServer(Handler handler) {
server = new Server(port);
server.setHandler(handler);
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void dispose() throws Exception {
if (httpClient != null) {
httpClient.stop();
}
if (server != null) {
server.stop();
}
}
protected String uri() {
return "http://localhost:" + port;
}
}

View File

@ -0,0 +1,30 @@
package com.baeldung.jetty.httpclient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
public class ProjectReactorUnitTest extends AbstractUnitTest {
@Test
public void givenReactiveClient_whenRequested_shouldReturn200() throws Exception {
Request request = httpClient.newRequest(uri());
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();
ReactiveResponse response = Mono.from(publisher)
.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
}
}

View File

@ -0,0 +1,28 @@
package com.baeldung.jetty.httpclient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
public class ReactiveStreamsUnitTest extends AbstractUnitTest {
@Test
public void givenReactiveClient_whenRequested_shouldReturn200() throws Exception {
Request request = httpClient.newRequest(uri());
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();
BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
}
}

View File

@ -0,0 +1,67 @@
package com.baeldung.jetty.httpclient;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.reactive.client.ReactiveRequest.Event.Type;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.http.MediaType;
import io.reactivex.Flowable;
import io.reactivex.Single;
public class RxJava2UnitTest extends AbstractUnitTest {
@Test
public void givenReactiveClient_whenRequestedWithBody_ShouldReturnBody() throws Exception {
Request request = httpClient.newRequest(uri());
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.content(ReactiveRequest.Content.fromString(CONTENT, MediaType.TEXT_PLAIN_VALUE, UTF_8))
.build();
Publisher<String> publisher = reactiveRequest.response(ReactiveResponse.Content.asString());
String responseContent = Single.fromPublisher(publisher)
.blockingGet();
Assert.assertEquals(CONTENT, responseContent);
}
@Test
public void givenReactiveClient_whenRequested_ShouldPrintEvents() throws Exception {
ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, uri())
.content(ReactiveRequest.Content.fromString(CONTENT, MediaType.TEXT_PLAIN_VALUE, UTF_8))
.build();
Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();
Publisher<ReactiveResponse.Event> responseEvents = request.responseEvents();
List<Type> requestEventTypes = new ArrayList<>();
List<ReactiveResponse.Event.Type> responseEventTypes = new ArrayList<>();
Flowable.fromPublisher(requestEvents)
.map(ReactiveRequest.Event::getType)
.subscribe(requestEventTypes::add);
Flowable.fromPublisher(responseEvents)
.map(ReactiveResponse.Event::getType)
.subscribe(responseEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());
int actualStatus = response.blockingGet()
.getStatus();
Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(5, responseEventTypes.size());
Assert.assertEquals(actualStatus, HttpStatus.OK_200);
}
}

View File

@ -0,0 +1,36 @@
package com.baeldung.jetty.httpclient;
import org.eclipse.jetty.client.HttpClient;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.JettyClientHttpConnector;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class SpringWebFluxUnitTest extends AbstractUnitTest {
@Test
public void givenReactiveClient_whenRequested_shouldReturnResponse() throws Exception {
HttpClient httpClient = new HttpClient();
httpClient.start();
ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);
WebClient client = WebClient.builder()
.clientConnector(clientConnector)
.build();
String responseContent = client.post()
.uri(uri())
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromPublisher(Mono.just(CONTENT), String.class))
.retrieve()
.bodyToMono(String.class)
.block();
Assert.assertNotNull(responseContent);
Assert.assertEquals(CONTENT, responseContent);
}
}