From 82a993244664f40b0a21bcddce3b31e67ecef661 Mon Sep 17 00:00:00 2001 From: Ryan Schmitt Date: Fri, 6 Dec 2019 15:13:11 -0800 Subject: [PATCH] Add reactive test coverage This commit adds test coverage for the `:httpcore5-reactive` bindings, using different types of clients and protocols. --- httpclient5-testing/pom.xml | 11 + .../AbstractHttpReactiveFundamentalsTest.java | 315 ++++++++++++++++++ .../testing/async/AbstractServerTestBase.java | 18 +- .../testing/async/TestH2MinimalReactive.java | 72 ++++ .../client5/testing/async/TestH2Reactive.java | 87 +++++ .../testing/async/TestHttp1Reactive.java | 207 ++++++++++++ .../async/TestHttpMinimalReactive.java | 148 ++++++++ 7 files changed, 856 insertions(+), 2 deletions(-) create mode 100644 httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java create mode 100644 httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2MinimalReactive.java create mode 100644 httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2Reactive.java create mode 100644 httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Reactive.java create mode 100644 httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpMinimalReactive.java diff --git a/httpclient5-testing/pom.xml b/httpclient5-testing/pom.xml index 1a7c9697d..31e286cde 100644 --- a/httpclient5-testing/pom.xml +++ b/httpclient5-testing/pom.xml @@ -42,6 +42,11 @@ httpcore5-testing compile + + org.apache.httpcomponents.core5 + httpcore5-reactive + test + org.apache.httpcomponents.client5 httpclient5 @@ -93,6 +98,12 @@ mockito-core test + + io.reactivex.rxjava2 + rxjava + ${rxjava.version} + test + diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java new file mode 100644 index 000000000..f20ea55b5 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java @@ -0,0 +1,315 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.apache.hc.core5.testing.reactive.ReactiveTestUtils; +import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription; +import org.apache.hc.core5.util.TextUtils; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; + +import io.reactivex.Flowable; +import io.reactivex.functions.Consumer; +import io.reactivex.schedulers.Schedulers; + +public abstract class AbstractHttpReactiveFundamentalsTest extends AbstractIntegrationTestBase { + + public AbstractHttpReactiveFundamentalsTest(final URIScheme scheme) { + super(scheme); + } + + @Override + protected final boolean isReactive() { + return true; + } + + @Test(timeout = 60_000) + public void testSequentialGetRequests() throws Exception { + final HttpHost target = start(); + for (int i = 0; i < 3; i++) { + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + + httpclient.execute(AsyncRequestBuilder.get(target + "/random/2048").build(), consumer, null); + + final Message> response = consumer.getResponseFuture().get(); + Assert.assertThat(response, CoreMatchers.notNullValue()); + Assert.assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200)); + + final String body = publisherToString(response.getBody()); + Assert.assertThat(body, CoreMatchers.notNullValue()); + Assert.assertThat(body.length(), CoreMatchers.equalTo(2048)); + } + } + + @Test(timeout = 2000) + public void testSequentialHeadRequests() throws Exception { + final HttpHost target = start(); + for (int i = 0; i < 3; i++) { + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + + httpclient.execute(AsyncRequestBuilder.head(target + "/random/2048").build(), consumer, null); + + final Message> response = consumer.getResponseFuture().get(); + Assert.assertThat(response, CoreMatchers.notNullValue()); + Assert.assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200)); + + final String body = publisherToString(response.getBody()); + Assert.assertThat(body, CoreMatchers.nullValue()); + } + } + + @Test(timeout = 60_000) + public void testSequentialPostRequests() throws Exception { + final HttpHost target = start(); + for (int i = 0; i < 3; i++) { + final byte[] b1 = new byte[1024]; + final Random rnd = new Random(System.currentTimeMillis()); + rnd.nextBytes(b1); + final Flowable publisher = Flowable.just(ByteBuffer.wrap(b1)); + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + final AsyncRequestProducer request = AsyncRequestBuilder.post(target + "/echo/") + .setEntity(new ReactiveEntityProducer(publisher, -1, ContentType.APPLICATION_OCTET_STREAM, null)) + .build(); + + httpclient.execute(request, consumer, HttpClientContext.create(), null); + + final Future>> responseFuture = consumer.getResponseFuture(); + final Message> responseMessage = responseFuture.get(); + Assert.assertThat(responseMessage, CoreMatchers.notNullValue()); + final HttpResponse response = responseMessage.getHead(); + Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200)); + final byte[] b2 = publisherToByteArray(responseMessage.getBody()); + Assert.assertThat(b1, CoreMatchers.equalTo(b2)); + } + } + + @Test(timeout = 60_000) + public void testConcurrentPostRequests() throws Exception { + final HttpHost target = start(); + + final int reqCount = 500; + final int maxSize = 128 * 1024; + final Map testCases = StreamingTestCase.generate(reqCount, maxSize); + final BlockingQueue responses = new ArrayBlockingQueue<>(reqCount); + + for (final StreamingTestCase testCase : testCases.values()) { + final ReactiveEntityProducer producer = new ReactiveEntityProducer(testCase.stream, testCase.length, + ContentType.APPLICATION_OCTET_STREAM, null); + final AsyncRequestProducer request = AsyncRequestBuilder.post(target + "/echo/") + .setEntity(producer) + .build(); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(new FutureCallback>>() { + public void completed(final Message> result) { + final Flowable flowable = Flowable.fromPublisher(result.getBody()) + .observeOn(Schedulers.io()); // Stream the data on an RxJava scheduler, not a client thread + ReactiveTestUtils.consumeStream(flowable) + .subscribe(new Consumer() { + @Override + public void accept(final StreamDescription streamDescription) { + responses.add(streamDescription); + } + }); + } + public void failed(final Exception ex) { } + public void cancelled() { } + }); + httpclient.execute(request, consumer, HttpClientContext.create(), null); + } + + for (int i = 0; i < reqCount; i++) { + final StreamDescription streamDescription = responses.take(); + final StreamingTestCase streamingTestCase = testCases.get(streamDescription.length); + final long expectedLength = streamingTestCase.length; + final long actualLength = streamDescription.length; + Assert.assertEquals(expectedLength, actualLength); + + final String expectedHash = streamingTestCase.expectedHash.get(); + final String actualHash = TextUtils.toHexString(streamDescription.md.digest()); + Assert.assertEquals(expectedHash, actualHash); + } + } + + @Test(timeout = 60_000) + public void testRequestExecutionFromCallback() throws Exception { + final HttpHost target = start(); + final int requestNum = 50; + final AtomicInteger count = new AtomicInteger(requestNum); + final Queue>> resultQueue = new ConcurrentLinkedQueue<>(); + final CountDownLatch countDownLatch = new CountDownLatch(requestNum); + + final FutureCallback>> callback = new FutureCallback>>() { + @Override + public void completed(final Message> result) { + try { + resultQueue.add(result); + if (count.decrementAndGet() > 0) { + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(this); + httpclient.execute(AsyncRequestBuilder.get(target + "/random/2048").build(), consumer, null); + } + } finally { + countDownLatch.countDown(); + } + } + + @Override + public void failed(final Exception ex) { + countDownLatch.countDown(); + } + + @Override + public void cancelled() { + countDownLatch.countDown(); + } + }; + + final int threadNum = 5; + final ExecutorService executorService = Executors.newFixedThreadPool(threadNum); + for (int i = 0; i < threadNum; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + if (!Thread.currentThread().isInterrupted()) { + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(callback); + httpclient.execute(AsyncRequestBuilder.get(target + "/random/2048").build(), consumer, null); + } + } + }); + } + + Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true)); + + executorService.shutdownNow(); + executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + for (;;) { + final Message> response = resultQueue.poll(); + if (response == null) { + break; + } + Assert.assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200)); + } + } + + @Test + public void testBadRequest() throws Exception { + final HttpHost target = start(); + final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/boom").build(); + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + + httpclient.execute(request, consumer, null); + + final Future>> future = consumer.getResponseFuture(); + final HttpResponse response = future.get().getHead(); + Assert.assertThat(response, CoreMatchers.notNullValue()); + Assert.assertThat(response.getCode(), CoreMatchers.equalTo(400)); + } + + static String publisherToString(final Publisher publisher) throws Exception { + final byte[] bytes = publisherToByteArray(publisher); + if (bytes == null) { + return null; + } + return new String(bytes, StandardCharsets.UTF_8); + } + + static byte[] publisherToByteArray(final Publisher publisher) throws Exception { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (WritableByteChannel channel = Channels.newChannel(baos)) { + final List bufs = Flowable.fromPublisher(publisher) + .toList() + .blockingGet(); + if (bufs.isEmpty()) { + return null; + } + for (final ByteBuffer buf : bufs) { + channel.write(buf); + } + } + return baos.toByteArray(); + } + + private static final class StreamingTestCase { + final long length; + final AtomicReference expectedHash; + final Flowable stream; + + StreamingTestCase(final long length, final AtomicReference expectedHash, final Flowable stream) { + this.length = length; + this.expectedHash = expectedHash; + this.stream = stream; + } + + static Map generate(final int numTestCases, final int maxSize) { + final Map testCases = new LinkedHashMap<>(); + int testCaseNum = 0; + while (testCases.size() < numTestCases) { + final long seed = 198723L * testCaseNum++; + final int length = 1 + new Random(seed).nextInt(maxSize); + final AtomicReference expectedHash = new AtomicReference<>(null); + final Flowable stream = ReactiveTestUtils.produceStream(length, expectedHash); + final StreamingTestCase streamingTestCase = new StreamingTestCase(length, expectedHash, stream); + testCases.put((long) length, streamingTestCase); + } + return testCases; + } + } +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractServerTestBase.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractServerTestBase.java index 49671bd06..f281db2fe 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractServerTestBase.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractServerTestBase.java @@ -31,8 +31,11 @@ import org.apache.hc.client5.testing.SSLTestContexts; import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler; import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.testing.nio.H2TestServer; +import org.apache.hc.core5.testing.reactive.ReactiveEchoProcessor; +import org.apache.hc.core5.testing.reactive.ReactiveRandomProcessor; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; import org.junit.Rule; @@ -69,7 +72,11 @@ public abstract class AbstractServerTestBase { @Override public AsyncServerExchangeHandler get() { - return new AsyncEchoHandler(); + if (isReactive()) { + return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor()); + } else { + return new AsyncEchoHandler(); + } } }); @@ -77,7 +84,11 @@ public abstract class AbstractServerTestBase { @Override public AsyncServerExchangeHandler get() { - return new AsyncRandomHandler(); + if (isReactive()) { + return new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()); + } else { + return new AsyncRandomHandler(); + } } }); @@ -93,4 +104,7 @@ public abstract class AbstractServerTestBase { }; + protected boolean isReactive() { + return false; + } } diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2MinimalReactive.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2MinimalReactive.java new file mode 100644 index 000000000..8fbcc573c --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2MinimalReactive.java @@ -0,0 +1,72 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.async.MinimalH2AsyncClient; +import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.client5.testing.SSLTestContexts; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestH2MinimalReactive extends AbstractHttpReactiveFundamentalsTest { + + @Parameterized.Parameters(name = "{0}") + public static Collection protocols() { + return Arrays.asList(new Object[][]{ + { URIScheme.HTTP }, + { URIScheme.HTTPS }, + }); + } + + public TestH2MinimalReactive(final URIScheme scheme) { + super(scheme); + } + + @Override + protected MinimalH2AsyncClient createClient() throws Exception { + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(TIMEOUT) + .build(); + return HttpAsyncClients.createHttp2Minimal( + H2Config.DEFAULT, ioReactorConfig, new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext())); + } + + @Override + public HttpHost start() throws Exception { + return super.start(null, H2Config.DEFAULT); + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2Reactive.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2Reactive.java new file mode 100644 index 000000000..76bd0df10 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestH2Reactive.java @@ -0,0 +1,87 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; +import org.apache.hc.client5.testing.SSLTestContexts; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy; +import org.apache.hc.core5.http2.config.H2Config; +import org.junit.Rule; +import org.junit.rules.ExternalResource; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestH2Reactive extends AbstractHttpReactiveFundamentalsTest { + + @Parameterized.Parameters(name = "HTTP/2 {0}") + public static Collection protocols() { + return Arrays.asList(new Object[][]{ + { URIScheme.HTTP }, + { URIScheme.HTTPS } + }); + } + + protected H2AsyncClientBuilder clientBuilder; + + @Rule + public ExternalResource clientResource = new ExternalResource() { + + @Override + protected void before() throws Throwable { + clientBuilder = H2AsyncClientBuilder.create() + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectionRequestTimeout(TIMEOUT) + .setConnectTimeout(TIMEOUT) + .build()) + .setTlsStrategy(new BasicClientTlsStrategy(SSLTestContexts.createClientSSLContext())); + } + + }; + + public TestH2Reactive(final URIScheme scheme) { + super(scheme); + } + + @Override + protected CloseableHttpAsyncClient createClient() { + return clientBuilder.build(); + } + + @Override + public HttpHost start() throws Exception { + return super.start(null, H2Config.DEFAULT); + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Reactive.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Reactive.java new file mode 100644 index 000000000..15f6d2cc0 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Reactive.java @@ -0,0 +1,207 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.client5.testing.SSLTestContexts; +import org.apache.hc.core5.http.HeaderElements; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.config.Http1Config; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.reactivestreams.Publisher; + +@RunWith(Parameterized.class) +public class TestHttp1Reactive extends AbstractHttpReactiveFundamentalsTest { + + @Parameterized.Parameters(name = "HTTP/1.1 {0}") + public static Collection protocols() { + return Arrays.asList(new Object[][]{ + { URIScheme.HTTP }, + { URIScheme.HTTPS }, + }); + } + + protected HttpAsyncClientBuilder clientBuilder; + protected PoolingAsyncClientConnectionManager connManager; + + @Rule + public ExternalResource connManagerResource = new ExternalResource() { + + @Override + protected void before() throws Throwable { + connManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .build(); + } + + @Override + protected void after() { + if (connManager != null) { + connManager.close(); + connManager = null; + } + } + + }; + + @Rule + public ExternalResource clientResource = new ExternalResource() { + + @Override + protected void before() throws Throwable { + clientBuilder = HttpAsyncClientBuilder.create() + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectionRequestTimeout(TIMEOUT) + .setConnectTimeout(TIMEOUT) + .build()) + .setConnectionManager(connManager); + } + + }; + + public TestHttp1Reactive(final URIScheme scheme) { + super(scheme); + } + + @Override + protected CloseableHttpAsyncClient createClient() { + return clientBuilder.build(); + } + + @Override + public HttpHost start() throws Exception { + return super.start(null, Http1Config.DEFAULT); + } + + @Test(timeout = 60_000) + public void testSequentialGetRequestsCloseConnection() throws Exception { + final HttpHost target = start(); + for (int i = 0; i < 3; i++) { + final SimpleHttpRequest get = SimpleHttpRequests.GET.create(target, "/random/2048"); + get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE); + final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/2048").build(); + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + + httpclient.execute(request, consumer, null); + + final Message> response = consumer.getResponseFuture().get(); + Assert.assertThat(response, CoreMatchers.notNullValue()); + Assert.assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200)); + final String body = publisherToString(response.getBody()); + Assert.assertThat(body, CoreMatchers.notNullValue()); + Assert.assertThat(body.length(), CoreMatchers.equalTo(2048)); + } + } + + @Test(timeout = 60_000) + public void testConcurrentPostsOverMultipleConnections() throws Exception { + connManager.setDefaultMaxPerRoute(20); + connManager.setMaxTotal(100); + super.testConcurrentPostRequests(); + } + + @Test(timeout = 60_000) + public void testConcurrentPostsOverSingleConnection() throws Exception { + connManager.setDefaultMaxPerRoute(1); + connManager.setMaxTotal(100); + super.testConcurrentPostRequests(); + } + + @Test(timeout = 60_000) + public void testSharedPool() throws Exception { + final HttpHost target = start(); + final AsyncRequestProducer request1 = AsyncRequestBuilder.get(target + "/random/2048").build(); + final ReactiveResponseConsumer consumer1 = new ReactiveResponseConsumer(); + + httpclient.execute(request1, consumer1, null); + + final Message> response1 = consumer1.getResponseFuture().get(); + Assert.assertThat(response1, CoreMatchers.notNullValue()); + Assert.assertThat(response1.getHead(), CoreMatchers.notNullValue()); + Assert.assertThat(response1.getHead().getCode(), CoreMatchers.equalTo(200)); + final String body1 = publisherToString(response1.getBody()); + Assert.assertThat(body1, CoreMatchers.notNullValue()); + Assert.assertThat(body1.length(), CoreMatchers.equalTo(2048)); + + + try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom() + .setConnectionManager(connManager) + .setConnectionManagerShared(true) + .build()) { + httpclient2.start(); + final AsyncRequestProducer request2 = AsyncRequestBuilder.get(target + "/random/2048").build(); + final ReactiveResponseConsumer consumer2 = new ReactiveResponseConsumer(); + + httpclient2.execute(request2, consumer2, null); + + final Message> response2 = consumer2.getResponseFuture().get(); + Assert.assertThat(response2, CoreMatchers.notNullValue()); + Assert.assertThat(response2.getHead().getCode(), CoreMatchers.equalTo(200)); + final String body2 = publisherToString(response2.getBody()); + Assert.assertThat(body2, CoreMatchers.notNullValue()); + Assert.assertThat(body2.length(), CoreMatchers.equalTo(2048)); + } + + final AsyncRequestProducer request3 = AsyncRequestBuilder.get(target + "/random/2048").build(); + final ReactiveResponseConsumer consumer3 = new ReactiveResponseConsumer(); + + httpclient.execute(request3, consumer3, null); + + final Message> response3 = consumer3.getResponseFuture().get(); + Assert.assertThat(response3, CoreMatchers.notNullValue()); + Assert.assertThat(response3.getHead().getCode(), CoreMatchers.equalTo(200)); + final String body3 = publisherToString(response3.getBody()); + Assert.assertThat(body3, CoreMatchers.notNullValue()); + Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048)); + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpMinimalReactive.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpMinimalReactive.java new file mode 100644 index 000000000..751349682 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpMinimalReactive.java @@ -0,0 +1,148 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.client5.testing.SSLTestContexts; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.config.Http1Config; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; +import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestHttpMinimalReactive extends AbstractHttpReactiveFundamentalsTest { + + @Parameterized.Parameters(name = "Minimal {0} {1}") + public static Collection protocols() { + return Arrays.asList(new Object[][]{ + { HttpVersion.HTTP_1_1, URIScheme.HTTP }, + { HttpVersion.HTTP_1_1, URIScheme.HTTPS }, + { HttpVersion.HTTP_2, URIScheme.HTTP }, + { HttpVersion.HTTP_2, URIScheme.HTTPS } + }); + } + + protected final HttpVersion version; + + public TestHttpMinimalReactive(final HttpVersion version, final URIScheme scheme) { + super(scheme); + this.version = version; + } + + @Override + protected MinimalHttpAsyncClient createClient() throws Exception { + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .build(); + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(TIMEOUT) + .build(); + if (version.greaterEquals(HttpVersion.HTTP_2)) { + return HttpAsyncClients.createMinimal( + HttpVersionPolicy.FORCE_HTTP_2, H2Config.DEFAULT, Http1Config.DEFAULT, ioReactorConfig, connectionManager); + } else { + return HttpAsyncClients.createMinimal( + HttpVersionPolicy.FORCE_HTTP_1, H2Config.DEFAULT, Http1Config.DEFAULT, ioReactorConfig, connectionManager); + } + } + + @Override + public HttpHost start() throws Exception { + if (version.greaterEquals(HttpVersion.HTTP_2)) { + return super.start(null, H2Config.DEFAULT); + } else { + return super.start(null, Http1Config.DEFAULT); + } + } + + @Test + public void testConcurrentPostRequestsSameEndpoint() throws Exception { + final HttpHost target = start(); + final byte[] b1 = new byte[1024]; + final Random rnd = new Random(System.currentTimeMillis()); + rnd.nextBytes(b1); + + final int reqCount = 20; + + final Future endpointLease = httpclient.lease(target, null); + final AsyncClientEndpoint endpoint = endpointLease.get(5, TimeUnit.SECONDS); + try { + final Queue>> queue = new LinkedList<>(); + for (int i = 0; i < reqCount; i++) { + final Future> future = endpoint.execute( + new BasicRequestProducer(Method.GET, target, "/echo/", + AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)), + new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null); + queue.add(future); + } + while (!queue.isEmpty()) { + final Future> future = queue.remove(); + final Message responseMessage = future.get(); + Assert.assertThat(responseMessage, CoreMatchers.notNullValue()); + final HttpResponse response = responseMessage.getHead(); + Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200)); + final byte[] b2 = responseMessage.getBody(); + Assert.assertThat(b1, CoreMatchers.equalTo(b2)); + endpoint.releaseAndReuse(); + } + } finally { + endpoint.releaseAndDiscard(); + } + + } + +}