Async clients to support request specific push consumers

This commit is contained in:
Oleg Kalnichevski 2018-09-20 17:16:06 +02:00
parent 7f3539c181
commit 9eb00018ce
12 changed files with 110 additions and 39 deletions

View File

@ -29,10 +29,10 @@ package org.apache.hc.client5.http.async;
import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpContext;
/**
@ -59,25 +59,16 @@ public interface HttpAsyncClient {
* @param <T> the result type of request execution.
* @param requestProducer request producer callback.
* @param responseConsumer response consumer callback.
* @param context HTTP context
* @param callback future callback.
* @param pushHandlerFactory the push handler factory. Optional and may be {@code null}.
* @param context HTTP context. Optional and may be {@code null}.
* @param callback future callback. Optional and may be {@code null}.
* @return future representing pending completion of the operation.
*/
<T> Future<T> execute(
AsyncRequestProducer requestProducer,
AsyncResponseConsumer<T> responseConsumer,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context,
FutureCallback<T> callback);
/**
* Registers {@link AsyncPushConsumer} for the given host and the URI pattern.
*
* @param hostname the name of the host this consumer intended for.
* Can be {@code null} if applies to all hosts
* @param uriPattern URI request pattern
* @param supplier supplier that will be used to supply a consumer instance
* for the given combination of hostname and URI pattern.
*/
void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
}

View File

@ -34,8 +34,10 @@ import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@ -53,6 +55,7 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
final ComplexFuture<T> future = new ComplexFuture<>(callback);
@ -76,14 +79,17 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
future.cancel();
}
})));
}), pushHandlerFactory, context));
return future;
}
public final Cancellable execute(final AsyncClientExchangeHandler exchangeHandler) {
return execute(exchangeHandler, HttpClientContext.create());
return execute(exchangeHandler, null, HttpClientContext.create());
}
public abstract Cancellable execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
public abstract Cancellable execute(
AsyncClientExchangeHandler exchangeHandler,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context);
}

View File

@ -74,12 +74,16 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
return execute(requestProducer, responseConsumer, null, context, callback);
}
public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
register(null, uriPattern, supplier);
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
}
public final Future<SimpleHttpResponse> execute(
@ -115,4 +119,10 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
return execute(request, HttpClientContext.create(), callback);
}
public abstract void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
register(null, uriPattern, supplier);
}
}

View File

@ -57,9 +57,11 @@ import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@ -127,7 +129,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
}
}
abstract AsyncExecRuntime crerateAsyncExecRuntime();
abstract AsyncExecRuntime crerateAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException;
@ -135,6 +137,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
@ -158,7 +161,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
}
final HttpRoute route = determineRoute(request, clientContext);
final String exchangeId = ExecSupport.getNextExchangeId();
final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime();
final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime(pushHandlerFactory);
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": preparing request execution");
}

View File

@ -43,6 +43,8 @@ import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@ -71,8 +73,8 @@ class InternalHttp2AsyncClient extends InternalAbstractHttpAsyncClient {
}
@Override
AsyncExecRuntime crerateAsyncExecRuntime() {
return new InternalHttp2AsyncExecRuntime(log, connPool);
AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory);
}
@Override

View File

@ -41,6 +41,8 @@ import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.io.CloseMode;
@ -53,13 +55,18 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
private final Logger log;
private final H2ConnPool connPool;
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final AtomicReference<Endpoint> sessionRef;
private volatile boolean reusable;
InternalHttp2AsyncExecRuntime(final Logger log, final H2ConnPool connPool) {
InternalHttp2AsyncExecRuntime(
final Logger log,
final H2ConnPool connPool,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
super();
this.log = log;
this.connPool = connPool;
this.pushHandlerFactory = pushHandlerFactory;
this.sessionRef = new AtomicReference<>(null);
}
@ -198,7 +205,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
session.enqueue(
new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context),
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
} else {
final HttpHost target = endpoint.target;
@ -213,7 +220,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
session.enqueue(
new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context),
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
}
@ -255,7 +262,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
@Override
public AsyncExecRuntime fork() {
return new InternalHttp2AsyncExecRuntime(log, connPool);
return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory);
}
}

View File

@ -46,6 +46,8 @@ import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@ -77,8 +79,8 @@ class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClient {
}
@Override
AsyncExecRuntime crerateAsyncExecRuntime() {
return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), versionPolicy);
AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), pushHandlerFactory, versionPolicy);
}
@Override

View File

@ -42,6 +42,8 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.util.TimeValue;
@ -52,6 +54,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
private final Logger log;
private final AsyncClientConnectionManager manager;
private final ConnectionInitiator connectionInitiator;
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final HttpVersionPolicy versionPolicy;
private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
private volatile boolean reusable;
@ -62,11 +65,13 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
final Logger log,
final AsyncClientConnectionManager manager,
final ConnectionInitiator connectionInitiator,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpVersionPolicy versionPolicy) {
super();
this.log = log;
this.manager = manager;
this.connectionInitiator = connectionInitiator;
this.pushHandlerFactory = pushHandlerFactory;
this.versionPolicy = versionPolicy;
this.endpointRef = new AtomicReference<>(null);
this.validDuration = TimeValue.NEG_ONE_MILLISECONDS;
@ -252,7 +257,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
try {
endpoint.execute(exchangeHandler, context);
endpoint.execute(exchangeHandler, pushHandlerFactory, context);
} catch (final RuntimeException ex) {
failed(ex);
}
@ -289,7 +294,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Override
public AsyncExecRuntime fork() {
return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, versionPolicy);
return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, versionPolicy);
}
}

View File

@ -53,8 +53,10 @@ import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@ -113,6 +115,7 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien
@Override
public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
ensureRunning();
final ComplexCancellable cancellable = new ComplexCancellable();
@ -212,11 +215,17 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien
session.enqueue(
new RequestExecutionCommand(
new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler),
null, cancellable, clientContext),
pushHandlerFactory,
cancellable,
clientContext),
Command.Priority.NORMAL);
} else {
session.enqueue(
new RequestExecutionCommand(internalExchangeHandler, null, cancellable, clientContext),
new RequestExecutionCommand(
internalExchangeHandler,
pushHandlerFactory,
cancellable,
clientContext),
Command.Priority.NORMAL);
}
}

View File

@ -216,6 +216,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
@Override
public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
ensureRunning();
final ComplexCancellable cancellable = new ComplexCancellable();
@ -362,7 +363,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
}
};
endpoint.execute(internalExchangeHandler, clientContext);
endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
}
@Override
@ -420,6 +421,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
log.debug(ConnPoolSupport.getId(connectionEndpoint) + ": executing message exchange " + exchangeId);
connectionEndpoint.execute(
new LoggingAsyncClientExchangeHandler(log, exchangeId, exchangeHandler),
pushHandlerFactory,
context);
} else {
connectionEndpoint.execute(exchangeHandler, context);

View File

@ -57,6 +57,8 @@ import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.HttpContext;
@ -553,13 +555,16 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
}
@Override
public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
if (log.isDebugEnabled()) {
log.debug(id + ": executing exchange " + ConnPoolSupport.getId(exchangeHandler) +
" over " + ConnPoolSupport.getId(connection));
}
connection.submitCommand(new RequestExecutionCommand(exchangeHandler, context));
connection.submitCommand(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context));
}
}

View File

@ -36,8 +36,10 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
@ -50,11 +52,21 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
@Contract(threading = ThreadingBehavior.SAFE)
public abstract class AsyncConnectionEndpoint implements Closeable {
public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
public abstract void execute(
AsyncClientExchangeHandler exchangeHandler,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context);
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
final HttpContext context) {
execute(exchangeHandler, null, context);
}
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
final BasicFuture<T> future = new BasicFuture<>(callback);
@ -77,6 +89,7 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
}
}),
pushHandlerFactory,
context != null ? context : HttpCoreContext.create());
return future;
}
@ -84,8 +97,24 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, null, callback);
return execute(requestProducer, responseConsumer, null, context, callback);
}
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, pushHandlerFactory, null, callback);
}
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, null, null, callback);
}
public abstract boolean isConnected();