From e92c75be8a89ee00686674f397cc036645e66019 Mon Sep 17 00:00:00 2001 From: Ryan Schmitt Date: Mon, 24 Sep 2018 14:20:27 -0700 Subject: [PATCH] HTTPCLIENT-1942: Add example of full-duplex reactive message exchange --- httpclient5/pom.xml | 10 ++ .../ReactiveClientFullDuplexExchange.java | 115 ++++++++++++++++++ pom.xml | 9 +- 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 httpclient5/src/examples/org/apache/hc/client5/http/examples/ReactiveClientFullDuplexExchange.java diff --git a/httpclient5/pom.xml b/httpclient5/pom.xml index dca207d87..23082c8e5 100644 --- a/httpclient5/pom.xml +++ b/httpclient5/pom.xml @@ -52,6 +52,16 @@ slf4j-api compile + + org.apache.httpcomponents.core5 + httpcore5-reactive + test + + + io.reactivex.rxjava2 + rxjava + test + org.apache.logging.log4j log4j-slf4j-impl diff --git a/httpclient5/src/examples/org/apache/hc/client5/http/examples/ReactiveClientFullDuplexExchange.java b/httpclient5/src/examples/org/apache/hc/client5/http/examples/ReactiveClientFullDuplexExchange.java new file mode 100644 index 000000000..d81ae243b --- /dev/null +++ b/httpclient5/src/examples/org/apache/hc/client5/http/examples/ReactiveClientFullDuplexExchange.java @@ -0,0 +1,115 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import io.reactivex.Flowable; +import io.reactivex.Notification; +import io.reactivex.Observable; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.config.H1Config; +import org.apache.hc.core5.http.nio.BasicRequestProducer; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; +import org.reactivestreams.Publisher; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * This example demonstrates a reactive, full-duplex HTTP/1.1 message exchange using RxJava. + */ +public class ReactiveClientFullDuplexExchange { + + public static void main(final String[] args) throws Exception { + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(5)) + .build(); + + final MinimalHttpAsyncClient client = HttpAsyncClients.createMinimal( + HttpVersionPolicy.NEGOTIATE, + H2Config.DEFAULT, + H1Config.DEFAULT, + ioReactorConfig); + + client.start(); + + final URI requestUri = new URI("http://httpbin.org/post"); + byte[] bs = "stuff".getBytes(StandardCharsets.UTF_8); + final ReactiveEntityProducer reactiveEntityProducer = new ReactiveEntityProducer( + Flowable.just(ByteBuffer.wrap(bs)), bs.length, ContentType.TEXT_PLAIN, null); + final BasicRequestProducer requestProducer = new BasicRequestProducer( + "POST", requestUri, reactiveEntityProducer); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + final Future requestFuture = client.execute(requestProducer, consumer, null); + final Message> streamingResponse = consumer.getResponseFuture().get(); + + System.out.println(streamingResponse.getHead()); + for (Header header : streamingResponse.getHead().getAllHeaders()) { + System.out.println(header.toString()); + } + System.out.println(); + + Observable.fromPublisher(streamingResponse.getBody()) + .map(new Function() { + @Override + public String apply(final ByteBuffer byteBuffer) throws Exception { + final byte[] string = new byte[byteBuffer.remaining()]; + byteBuffer.get(string); + return new String(string); + } + }) + .materialize() + .forEach(new Consumer>() { + @Override + public void accept(final Notification byteBufferNotification) throws Exception { + System.out.println(byteBufferNotification.toString()); + } + }); + + requestFuture.get(1, TimeUnit.MINUTES); + + System.out.println("Shutting down"); + client.shutdown(CloseMode.GRACEFUL); + } +} diff --git a/pom.xml b/pom.xml index 459594146..d6b95db60 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,7 @@ 2.21.0 4.5.2 1 + 2.2.2 @@ -143,6 +144,12 @@ jna-platform ${jna.version} + + io.reactivex.rxjava2 + rxjava + ${rxjava.version} + test + junit junit @@ -389,7 +396,7 @@ src/docbkx/resources/** src/test/resources/*.truststore .checkstyle - .externalToolBuilders/** + .externalToolBuilders/** maven-eclipse.xml **/serial **/index.txt