From 9eb00018cefc94ce405a54ff48bb4bbd2d2d90aa Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Thu, 20 Sep 2018 17:16:06 +0200 Subject: [PATCH] Async clients to support request specific push consumers --- .../client5/http/async/HttpAsyncClient.java | 19 +++-------- .../AbstractMinimalHttpAsyncClientBase.java | 12 +++++-- .../impl/async/CloseableHttpAsyncClient.java | 16 +++++++-- .../InternalAbstractHttpAsyncClient.java | 7 ++-- .../impl/async/InternalHttp2AsyncClient.java | 6 ++-- .../async/InternalHttp2AsyncExecRuntime.java | 15 ++++++--- .../impl/async/InternalHttpAsyncClient.java | 6 ++-- .../async/InternalHttpAsyncExecRuntime.java | 9 +++-- .../impl/async/MinimalHttp2AsyncClient.java | 13 ++++++-- .../impl/async/MinimalHttpAsyncClient.java | 4 ++- .../PoolingAsyncClientConnectionManager.java | 9 +++-- .../http/nio/AsyncConnectionEndpoint.java | 33 +++++++++++++++++-- 12 files changed, 110 insertions(+), 39 deletions(-) diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java index 141396841..3e1c68445 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java @@ -29,10 +29,10 @@ package org.apache.hc.client5.http.async; import java.util.concurrent.Future; import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.protocol.HttpContext; /** @@ -59,25 +59,16 @@ public interface HttpAsyncClient { * @param the result type of request execution. * @param requestProducer request producer callback. * @param responseConsumer response consumer callback. - * @param context HTTP context - * @param callback future callback. + * @param pushHandlerFactory the push handler factory. Optional and may be {@code null}. + * @param context HTTP context. Optional and may be {@code null}. + * @param callback future callback. Optional and may be {@code null}. * @return future representing pending completion of the operation. */ Future execute( AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer, + HandlerFactory pushHandlerFactory, HttpContext context, FutureCallback callback); - /** - * Registers {@link AsyncPushConsumer} for the given host and the URI pattern. - * - * @param hostname the name of the host this consumer intended for. - * Can be {@code null} if applies to all hosts - * @param uriPattern URI request pattern - * @param supplier supplier that will be used to supply a consumer instance - * for the given combination of hostname and URI pattern. - */ - void register(String hostname, String uriPattern, Supplier supplier); - } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java index 3c0505f43..0ff5459a8 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java @@ -34,8 +34,10 @@ import org.apache.hc.core5.concurrent.Cancellable; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; @@ -53,6 +55,7 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien public final Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final HandlerFactory pushHandlerFactory, final HttpContext context, final FutureCallback callback) { final ComplexFuture future = new ComplexFuture<>(callback); @@ -76,14 +79,17 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien future.cancel(); } - }))); + }), pushHandlerFactory, context)); return future; } public final Cancellable execute(final AsyncClientExchangeHandler exchangeHandler) { - return execute(exchangeHandler, HttpClientContext.create()); + return execute(exchangeHandler, null, HttpClientContext.create()); } - public abstract Cancellable execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context); + public abstract Cancellable execute( + AsyncClientExchangeHandler exchangeHandler, + HandlerFactory pushHandlerFactory, + HttpContext context); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java index 2e82ee1d7..65dbe51b0 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java @@ -74,12 +74,16 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close public final Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final HttpContext context, final FutureCallback callback) { - return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback); + return execute(requestProducer, responseConsumer, null, context, callback); } - public final void register(final String uriPattern, final Supplier supplier) { - register(null, uriPattern, supplier); + public final Future execute( + final AsyncRequestProducer requestProducer, + final AsyncResponseConsumer responseConsumer, + final FutureCallback callback) { + return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback); } public final Future execute( @@ -115,4 +119,10 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close return execute(request, HttpClientContext.create(), callback); } + public abstract void register(String hostname, String uriPattern, Supplier supplier); + + public final void register(final String uriPattern, final Supplier supplier) { + register(null, uriPattern, supplier); + } + } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java index cf932347c..3fdbe55e3 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java @@ -57,9 +57,11 @@ import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.nio.AsyncDataConsumer; import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.RequestChannel; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; @@ -127,7 +129,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa } } - abstract AsyncExecRuntime crerateAsyncExecRuntime(); + abstract AsyncExecRuntime crerateAsyncExecRuntime(HandlerFactory pushHandlerFactory); abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException; @@ -135,6 +137,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa public Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final HandlerFactory pushHandlerFactory, final HttpContext context, final FutureCallback callback) { ensureRunning(); @@ -158,7 +161,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa } final HttpRoute route = determineRoute(request, clientContext); final String exchangeId = ExecSupport.getNextExchangeId(); - final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime(); + final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime(pushHandlerFactory); if (log.isDebugEnabled()) { log.debug(exchangeId + ": preparing request execution"); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java index 8be84b5c1..af8054254 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java @@ -43,6 +43,8 @@ import org.apache.hc.client5.http.routing.RoutingSupport; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.config.Lookup; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.nio.pool.H2ConnPool; import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; @@ -71,8 +73,8 @@ class InternalHttp2AsyncClient extends InternalAbstractHttpAsyncClient { } @Override - AsyncExecRuntime crerateAsyncExecRuntime() { - return new InternalHttp2AsyncExecRuntime(log, connPool); + AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory pushHandlerFactory) { + return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory); } @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java index f5f3112e7..715222dd1 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java @@ -41,6 +41,8 @@ import org.apache.hc.core5.concurrent.ComplexCancellable; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; import org.apache.hc.core5.http2.nio.pool.H2ConnPool; import org.apache.hc.core5.io.CloseMode; @@ -53,13 +55,18 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime { private final Logger log; private final H2ConnPool connPool; + private final HandlerFactory pushHandlerFactory; private final AtomicReference sessionRef; private volatile boolean reusable; - InternalHttp2AsyncExecRuntime(final Logger log, final H2ConnPool connPool) { + InternalHttp2AsyncExecRuntime( + final Logger log, + final H2ConnPool connPool, + final HandlerFactory pushHandlerFactory) { super(); this.log = log; this.connPool = connPool; + this.pushHandlerFactory = pushHandlerFactory; this.sessionRef = new AtomicReference<>(null); } @@ -198,7 +205,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime { log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler)); } session.enqueue( - new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context), + new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context), Command.Priority.NORMAL); } else { final HttpHost target = endpoint.target; @@ -213,7 +220,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime { log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler)); } session.enqueue( - new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context), + new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context), Command.Priority.NORMAL); } @@ -255,7 +262,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime { @Override public AsyncExecRuntime fork() { - return new InternalHttp2AsyncExecRuntime(log, connPool); + return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java index eed92f1df..b0e46d02c 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java @@ -46,6 +46,8 @@ import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.config.Lookup; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; @@ -77,8 +79,8 @@ class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClient { } @Override - AsyncExecRuntime crerateAsyncExecRuntime() { - return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), versionPolicy); + AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory pushHandlerFactory) { + return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), pushHandlerFactory, versionPolicy); } @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java index 6145751bf..5f513dad7 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java @@ -42,6 +42,8 @@ import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.concurrent.Cancellable; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.reactor.ConnectionInitiator; import org.apache.hc.core5.util.TimeValue; @@ -52,6 +54,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime { private final Logger log; private final AsyncClientConnectionManager manager; private final ConnectionInitiator connectionInitiator; + private final HandlerFactory pushHandlerFactory; private final HttpVersionPolicy versionPolicy; private final AtomicReference endpointRef; private volatile boolean reusable; @@ -62,11 +65,13 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime { final Logger log, final AsyncClientConnectionManager manager, final ConnectionInitiator connectionInitiator, + final HandlerFactory pushHandlerFactory, final HttpVersionPolicy versionPolicy) { super(); this.log = log; this.manager = manager; this.connectionInitiator = connectionInitiator; + this.pushHandlerFactory = pushHandlerFactory; this.versionPolicy = versionPolicy; this.endpointRef = new AtomicReference<>(null); this.validDuration = TimeValue.NEG_ONE_MILLISECONDS; @@ -252,7 +257,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime { log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler)); } try { - endpoint.execute(exchangeHandler, context); + endpoint.execute(exchangeHandler, pushHandlerFactory, context); } catch (final RuntimeException ex) { failed(ex); } @@ -289,7 +294,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime { @Override public AsyncExecRuntime fork() { - return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, versionPolicy); + return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, versionPolicy); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java index 570ff7af3..20b2508c5 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java @@ -53,8 +53,10 @@ import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.CapacityChannel; import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.RequestChannel; import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; import org.apache.hc.core5.http.nio.command.ShutdownCommand; @@ -113,6 +115,7 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien @Override public Cancellable execute( final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, final HttpContext context) { ensureRunning(); final ComplexCancellable cancellable = new ComplexCancellable(); @@ -212,11 +215,17 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien session.enqueue( new RequestExecutionCommand( new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler), - null, cancellable, clientContext), + pushHandlerFactory, + cancellable, + clientContext), Command.Priority.NORMAL); } else { session.enqueue( - new RequestExecutionCommand(internalExchangeHandler, null, cancellable, clientContext), + new RequestExecutionCommand( + internalExchangeHandler, + pushHandlerFactory, + cancellable, + clientContext), Command.Priority.NORMAL); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java index 3f473a3ac..c5c29271e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java @@ -216,6 +216,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient @Override public Cancellable execute( final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, final HttpContext context) { ensureRunning(); final ComplexCancellable cancellable = new ComplexCancellable(); @@ -362,7 +363,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient } }; - endpoint.execute(internalExchangeHandler, clientContext); + endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext); } @Override @@ -420,6 +421,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient log.debug(ConnPoolSupport.getId(connectionEndpoint) + ": executing message exchange " + exchangeId); connectionEndpoint.execute( new LoggingAsyncClientExchangeHandler(log, exchangeId, exchangeHandler), + pushHandlerFactory, context); } else { connectionEndpoint.execute(exchangeHandler, context); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index 11675923e..3c28d7267 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -57,6 +57,8 @@ import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.http.protocol.HttpContext; @@ -553,13 +555,16 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio } @Override - public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) { + public void execute( + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection(); if (log.isDebugEnabled()) { log.debug(id + ": executing exchange " + ConnPoolSupport.getId(exchangeHandler) + " over " + ConnPoolSupport.getId(connection)); } - connection.submitCommand(new RequestExecutionCommand(exchangeHandler, context)); + connection.submitCommand(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context)); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java index 47767aaba..692863db3 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java @@ -36,8 +36,10 @@ import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; @@ -50,11 +52,21 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext; @Contract(threading = ThreadingBehavior.SAFE) public abstract class AsyncConnectionEndpoint implements Closeable { - public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context); + public abstract void execute( + AsyncClientExchangeHandler exchangeHandler, + HandlerFactory pushHandlerFactory, + HttpContext context); + + public void execute( + final AsyncClientExchangeHandler exchangeHandler, + final HttpContext context) { + execute(exchangeHandler, null, context); + } public Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final HandlerFactory pushHandlerFactory, final HttpContext context, final FutureCallback callback) { final BasicFuture future = new BasicFuture<>(callback); @@ -77,6 +89,7 @@ public abstract class AsyncConnectionEndpoint implements Closeable { } }), + pushHandlerFactory, context != null ? context : HttpCoreContext.create()); return future; } @@ -84,8 +97,24 @@ public abstract class AsyncConnectionEndpoint implements Closeable { public Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final HttpContext context, final FutureCallback callback) { - return execute(requestProducer, responseConsumer, null, callback); + return execute(requestProducer, responseConsumer, null, context, callback); + } + + public Future execute( + final AsyncRequestProducer requestProducer, + final AsyncResponseConsumer responseConsumer, + final HandlerFactory pushHandlerFactory, + final FutureCallback callback) { + return execute(requestProducer, responseConsumer, pushHandlerFactory, null, callback); + } + + public Future execute( + final AsyncRequestProducer requestProducer, + final AsyncResponseConsumer responseConsumer, + final FutureCallback callback) { + return execute(requestProducer, responseConsumer, null, null, callback); } public abstract boolean isConnected();