Async clients to support scheduled (delayed) re-execution of requests
This commit is contained in:
parent
656d0dd4f3
commit
13137eb6c7
|
@ -669,7 +669,9 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
|
|||
scope.originalRequest,
|
||||
new ComplexFuture<>(null),
|
||||
HttpClientContext.create(),
|
||||
scope.execRuntime.fork());
|
||||
scope.execRuntime.fork(),
|
||||
scope.scheduler,
|
||||
scope.execCount);
|
||||
cacheRevalidator.revalidateCacheEntry(
|
||||
responseCache.generateKey(target, request, entry),
|
||||
asyncExecCallback,
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
package org.apache.hc.client5.http.async;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hc.client5.http.HttpRoute;
|
||||
import org.apache.hc.client5.http.protocol.HttpClientContext;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException;
|
|||
import org.apache.hc.core5.http.HttpRequest;
|
||||
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
|
||||
import org.apache.hc.core5.util.Args;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
|
||||
/**
|
||||
* Represents a single element in the client side asynchronous request execution chain.
|
||||
|
@ -59,7 +61,36 @@ public interface AsyncExecChain {
|
|||
public final CancellableDependency cancellableDependency;
|
||||
public final HttpClientContext clientContext;
|
||||
public final AsyncExecRuntime execRuntime;
|
||||
public final Scheduler scheduler;
|
||||
public final AtomicInteger execCount;
|
||||
|
||||
/**
|
||||
* @since 5.1
|
||||
*/
|
||||
public Scope(
|
||||
final String exchangeId,
|
||||
final HttpRoute route,
|
||||
final HttpRequest originalRequest,
|
||||
final CancellableDependency cancellableDependency,
|
||||
final HttpClientContext clientContext,
|
||||
final AsyncExecRuntime execRuntime,
|
||||
final Scheduler scheduler,
|
||||
final AtomicInteger execCount) {
|
||||
this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
|
||||
this.route = Args.notNull(route, "Route");
|
||||
this.originalRequest = Args.notNull(originalRequest, "Original request");
|
||||
this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency");
|
||||
this.clientContext = clientContext != null ? clientContext : HttpClientContext.create();
|
||||
this.execRuntime = Args.notNull(execRuntime, "Exec runtime");
|
||||
this.scheduler = scheduler;
|
||||
this.execCount = execCount != null ? execCount : new AtomicInteger(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link Scope#Scope(String, HttpRoute, HttpRequest, CancellableDependency, HttpClientContext,
|
||||
* AsyncExecRuntime, Scheduler, AtomicInteger)}
|
||||
*/
|
||||
@Deprecated
|
||||
public Scope(
|
||||
final String exchangeId,
|
||||
final HttpRoute route,
|
||||
|
@ -67,16 +98,38 @@ public interface AsyncExecChain {
|
|||
final CancellableDependency cancellableDependency,
|
||||
final HttpClientContext clientContext,
|
||||
final AsyncExecRuntime execRuntime) {
|
||||
this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
|
||||
this.route = Args.notNull(route, "Route");
|
||||
this.originalRequest = Args.notNull(originalRequest, "Original request");
|
||||
this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency");
|
||||
this.clientContext = clientContext != null ? clientContext : HttpClientContext.create();
|
||||
this.execRuntime = Args.notNull(execRuntime, "Exec runtime");
|
||||
this(exchangeId, route, originalRequest, cancellableDependency, clientContext, execRuntime,
|
||||
null, new AtomicInteger(1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Request execution scheduler
|
||||
*
|
||||
* @since 5.1
|
||||
*/
|
||||
interface Scheduler {
|
||||
|
||||
/**
|
||||
* Schedules request re-execution immediately or after a delay.
|
||||
* @param request the actual request.
|
||||
* @param entityProducer the request entity producer or {@code null} if the request
|
||||
* does not enclose an entity.
|
||||
* @param scope the execution scope .
|
||||
* @param asyncExecCallback the execution callback.
|
||||
* @param delay re-execution delay. Can be {@code null} if the request is to be
|
||||
* re-executed immediately.
|
||||
*/
|
||||
void scheduleExecution(
|
||||
HttpRequest request,
|
||||
AsyncEntityProducer entityProducer,
|
||||
AsyncExecChain.Scope scope,
|
||||
AsyncExecCallback asyncExecCallback,
|
||||
TimeValue delay);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Proceeds to the next element in the request execution chain.
|
||||
*
|
||||
|
|
|
@ -187,8 +187,15 @@ public final class AsyncRedirectExec implements AsyncExecChainHandler {
|
|||
proxyAuthExchange.reset();
|
||||
}
|
||||
}
|
||||
state.currentScope = new AsyncExecChain.Scope(scope.exchangeId, newRoute,
|
||||
scope.originalRequest, scope.cancellableDependency, clientContext, scope.execRuntime);
|
||||
state.currentScope = new AsyncExecChain.Scope(
|
||||
scope.exchangeId,
|
||||
newRoute,
|
||||
scope.originalRequest,
|
||||
scope.cancellableDependency,
|
||||
scope.clientContext,
|
||||
scope.execRuntime,
|
||||
scope.scheduler,
|
||||
scope.execCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,9 +32,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hc.client5.http.HttpRoute;
|
||||
import org.apache.hc.client5.http.async.AsyncExecCallback;
|
||||
|
@ -49,7 +52,9 @@ import org.apache.hc.client5.http.cookie.CookieStore;
|
|||
import org.apache.hc.client5.http.impl.ExecSupport;
|
||||
import org.apache.hc.client5.http.protocol.HttpClientContext;
|
||||
import org.apache.hc.client5.http.routing.RoutingSupport;
|
||||
import org.apache.hc.core5.concurrent.Cancellable;
|
||||
import org.apache.hc.core5.concurrent.ComplexFuture;
|
||||
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
|
||||
import org.apache.hc.core5.concurrent.FutureCallback;
|
||||
import org.apache.hc.core5.http.EntityDetails;
|
||||
import org.apache.hc.core5.http.HttpException;
|
||||
|
@ -71,11 +76,14 @@ import org.apache.hc.core5.http.support.BasicRequestBuilder;
|
|||
import org.apache.hc.core5.io.CloseMode;
|
||||
import org.apache.hc.core5.io.ModalCloseable;
|
||||
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
|
||||
|
||||
private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor");
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
|
||||
private final AsyncExecChainElement execChain;
|
||||
private final Lookup<CookieSpecFactory> cookieSpecRegistry;
|
||||
|
@ -84,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
|
|||
private final CredentialsProvider credentialsProvider;
|
||||
private final RequestConfig defaultConfig;
|
||||
private final ConcurrentLinkedQueue<Closeable> closeables;
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
InternalAbstractHttpAsyncClient(
|
||||
final DefaultConnectingIOReactor ioReactor,
|
||||
|
@ -104,6 +113,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
|
|||
this.credentialsProvider = credentialsProvider;
|
||||
this.defaultConfig = defaultConfig;
|
||||
this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
|
||||
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,6 +132,12 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
|
|||
}
|
||||
}
|
||||
}
|
||||
final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
|
||||
for (final Runnable runnable: runnables) {
|
||||
if (runnable instanceof Cancellable) {
|
||||
((Cancellable) runnable).cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setupContext(final HttpClientContext context) {
|
||||
|
@ -187,10 +203,23 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
|
|||
clientContext.setExchangeId(exchangeId);
|
||||
setupContext(clientContext);
|
||||
|
||||
final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
|
||||
|
||||
@Override
|
||||
public void scheduleExecution(final HttpRequest request,
|
||||
final AsyncEntityProducer entityProducer,
|
||||
final AsyncExecChain.Scope scope,
|
||||
final AsyncExecCallback asyncExecCallback,
|
||||
final TimeValue delay) {
|
||||
executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
|
||||
clientContext, execRuntime);
|
||||
clientContext, execRuntime, scheduler, new AtomicInteger(1));
|
||||
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
|
||||
execChain.execute(
|
||||
executeImmediate(
|
||||
BasicRequestBuilder.copy(request).build(),
|
||||
entityDetails != null ? new AsyncEntityProducer() {
|
||||
|
||||
|
@ -329,4 +358,64 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
|
|||
return future;
|
||||
}
|
||||
|
||||
void executeImmediate(
|
||||
final HttpRequest request,
|
||||
final AsyncEntityProducer entityProducer,
|
||||
final AsyncExecChain.Scope scope,
|
||||
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
|
||||
execChain.execute(request, entityProducer, scope, asyncExecCallback);
|
||||
}
|
||||
|
||||
void executeScheduled(
|
||||
final HttpRequest request,
|
||||
final AsyncEntityProducer entityProducer,
|
||||
final AsyncExecChain.Scope scope,
|
||||
final AsyncExecCallback asyncExecCallback,
|
||||
final TimeValue delay) {
|
||||
final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
|
||||
request, entityProducer, scope, asyncExecCallback, delay);
|
||||
if (TimeValue.isPositive(delay)) {
|
||||
scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
|
||||
} else {
|
||||
scheduledExecutorService.execute(scheduledTask);
|
||||
}
|
||||
}
|
||||
|
||||
class ScheduledRequestExecution implements Runnable, Cancellable {
|
||||
|
||||
final HttpRequest request;
|
||||
final AsyncEntityProducer entityProducer;
|
||||
final AsyncExecChain.Scope scope;
|
||||
final AsyncExecCallback asyncExecCallback;
|
||||
final TimeValue delay;
|
||||
|
||||
ScheduledRequestExecution(final HttpRequest request,
|
||||
final AsyncEntityProducer entityProducer,
|
||||
final AsyncExecChain.Scope scope,
|
||||
final AsyncExecCallback asyncExecCallback,
|
||||
final TimeValue delay) {
|
||||
this.request = request;
|
||||
this.entityProducer = entityProducer;
|
||||
this.scope = scope;
|
||||
this.asyncExecCallback = asyncExecCallback;
|
||||
this.delay = delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
execChain.execute(request, entityProducer, scope, asyncExecCallback);
|
||||
} catch (final Exception ex) {
|
||||
asyncExecCallback.failed(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel() {
|
||||
asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue