diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java index d12215a20..93419a70d 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java @@ -669,7 +669,9 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler scope.originalRequest, new ComplexFuture<>(null), HttpClientContext.create(), - scope.execRuntime.fork()); + scope.execRuntime.fork(), + scope.scheduler, + scope.execCount); cacheRevalidator.revalidateCacheEntry( responseCache.generateKey(target, request, entry), asyncExecCallback, diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java index b2cff0bd3..081946029 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.http.async; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.protocol.HttpClientContext; @@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.nio.AsyncEntityProducer; import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; /** * Represents a single element in the client side asynchronous request execution chain. @@ -59,7 +61,36 @@ public interface AsyncExecChain { public final CancellableDependency cancellableDependency; public final HttpClientContext clientContext; public final AsyncExecRuntime execRuntime; + public final Scheduler scheduler; + public final AtomicInteger execCount; + /** + * @since 5.1 + */ + public Scope( + final String exchangeId, + final HttpRoute route, + final HttpRequest originalRequest, + final CancellableDependency cancellableDependency, + final HttpClientContext clientContext, + final AsyncExecRuntime execRuntime, + final Scheduler scheduler, + final AtomicInteger execCount) { + this.exchangeId = Args.notBlank(exchangeId, "Exchange id"); + this.route = Args.notNull(route, "Route"); + this.originalRequest = Args.notNull(originalRequest, "Original request"); + this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency"); + this.clientContext = clientContext != null ? clientContext : HttpClientContext.create(); + this.execRuntime = Args.notNull(execRuntime, "Exec runtime"); + this.scheduler = scheduler; + this.execCount = execCount != null ? execCount : new AtomicInteger(1); + } + + /** + * @deprecated Use {@link Scope#Scope(String, HttpRoute, HttpRequest, CancellableDependency, HttpClientContext, + * AsyncExecRuntime, Scheduler, AtomicInteger)} + */ + @Deprecated public Scope( final String exchangeId, final HttpRoute route, @@ -67,16 +98,38 @@ public interface AsyncExecChain { final CancellableDependency cancellableDependency, final HttpClientContext clientContext, final AsyncExecRuntime execRuntime) { - this.exchangeId = Args.notBlank(exchangeId, "Exchange id"); - this.route = Args.notNull(route, "Route"); - this.originalRequest = Args.notNull(originalRequest, "Original request"); - this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency"); - this.clientContext = clientContext != null ? clientContext : HttpClientContext.create(); - this.execRuntime = Args.notNull(execRuntime, "Exec runtime"); + this(exchangeId, route, originalRequest, cancellableDependency, clientContext, execRuntime, + null, new AtomicInteger(1)); } } + /** + * Request execution scheduler + * + * @since 5.1 + */ + interface Scheduler { + + /** + * Schedules request re-execution immediately or after a delay. + * @param request the actual request. + * @param entityProducer the request entity producer or {@code null} if the request + * does not enclose an entity. + * @param scope the execution scope . + * @param asyncExecCallback the execution callback. + * @param delay re-execution delay. Can be {@code null} if the request is to be + * re-executed immediately. + */ + void scheduleExecution( + HttpRequest request, + AsyncEntityProducer entityProducer, + AsyncExecChain.Scope scope, + AsyncExecCallback asyncExecCallback, + TimeValue delay); + + } + /** * Proceeds to the next element in the request execution chain. * diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java index b5c7eafc7..d04944bac 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java @@ -187,8 +187,15 @@ public final class AsyncRedirectExec implements AsyncExecChainHandler { proxyAuthExchange.reset(); } } - state.currentScope = new AsyncExecChain.Scope(scope.exchangeId, newRoute, - scope.originalRequest, scope.cancellableDependency, clientContext, scope.execRuntime); + state.currentScope = new AsyncExecChain.Scope( + scope.exchangeId, + newRoute, + scope.originalRequest, + scope.cancellableDependency, + scope.clientContext, + scope.execRuntime, + scope.scheduler, + scope.execCount); } } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java index 0b08573e4..6dac2751d 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java @@ -32,9 +32,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.async.AsyncExecCallback; @@ -49,7 +52,9 @@ import org.apache.hc.client5.http.cookie.CookieStore; import org.apache.hc.client5.http.impl.ExecSupport; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.routing.RoutingSupport; +import org.apache.hc.core5.concurrent.Cancellable; import org.apache.hc.core5.concurrent.ComplexFuture; +import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.HttpException; @@ -71,11 +76,14 @@ import org.apache.hc.core5.http.support.BasicRequestBuilder; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; +import org.apache.hc.core5.util.TimeValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase { + private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor"); + private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class); private final AsyncExecChainElement execChain; private final Lookup cookieSpecRegistry; @@ -84,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa private final CredentialsProvider credentialsProvider; private final RequestConfig defaultConfig; private final ConcurrentLinkedQueue closeables; + private final ScheduledExecutorService scheduledExecutorService; InternalAbstractHttpAsyncClient( final DefaultConnectingIOReactor ioReactor, @@ -104,6 +113,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa this.credentialsProvider = credentialsProvider; this.defaultConfig = defaultConfig; this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null; + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY); } @Override @@ -122,6 +132,12 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa } } } + final List runnables = this.scheduledExecutorService.shutdownNow(); + for (final Runnable runnable: runnables) { + if (runnable instanceof Cancellable) { + ((Cancellable) runnable).cancel(); + } + } } private void setupContext(final HttpClientContext context) { @@ -187,10 +203,23 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa clientContext.setExchangeId(exchangeId); setupContext(clientContext); + final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() { + + @Override + public void scheduleExecution(final HttpRequest request, + final AsyncEntityProducer entityProducer, + final AsyncExecChain.Scope scope, + final AsyncExecCallback asyncExecCallback, + final TimeValue delay) { + executeScheduled(request, entityProducer, scope, asyncExecCallback, delay); + } + + }; + final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future, - clientContext, execRuntime); + clientContext, execRuntime, scheduler, new AtomicInteger(1)); final AtomicBoolean outputTerminated = new AtomicBoolean(false); - execChain.execute( + executeImmediate( BasicRequestBuilder.copy(request).build(), entityDetails != null ? new AsyncEntityProducer() { @@ -329,4 +358,64 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa return future; } + void executeImmediate( + final HttpRequest request, + final AsyncEntityProducer entityProducer, + final AsyncExecChain.Scope scope, + final AsyncExecCallback asyncExecCallback) throws HttpException, IOException { + execChain.execute(request, entityProducer, scope, asyncExecCallback); + } + + void executeScheduled( + final HttpRequest request, + final AsyncEntityProducer entityProducer, + final AsyncExecChain.Scope scope, + final AsyncExecCallback asyncExecCallback, + final TimeValue delay) { + final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution( + request, entityProducer, scope, asyncExecCallback, delay); + if (TimeValue.isPositive(delay)) { + scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit()); + } else { + scheduledExecutorService.execute(scheduledTask); + } + } + + class ScheduledRequestExecution implements Runnable, Cancellable { + + final HttpRequest request; + final AsyncEntityProducer entityProducer; + final AsyncExecChain.Scope scope; + final AsyncExecCallback asyncExecCallback; + final TimeValue delay; + + ScheduledRequestExecution(final HttpRequest request, + final AsyncEntityProducer entityProducer, + final AsyncExecChain.Scope scope, + final AsyncExecCallback asyncExecCallback, + final TimeValue delay) { + this.request = request; + this.entityProducer = entityProducer; + this.scope = scope; + this.asyncExecCallback = asyncExecCallback; + this.delay = delay; + } + + @Override + public void run() { + try { + execChain.execute(request, entityProducer, scope, asyncExecCallback); + } catch (final Exception ex) { + asyncExecCallback.failed(ex); + } + } + + @Override + public boolean cancel() { + asyncExecCallback.failed(new CancellationException("Request execution cancelled")); + return true; + } + + } + }