Re-use concurrent primitives from HttpCore

This commit is contained in:
Oleg Kalnichevski 2018-01-12 14:07:41 +01:00
parent dee32207e3
commit 695f353b43
19 changed files with 74 additions and 194 deletions

View File

@ -42,6 +42,7 @@ import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.impl.Operations;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.util.Args;

View File

@ -55,6 +55,7 @@ import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.client5.http.utils.DateUtils;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
@ -201,7 +202,7 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
Args.notNull(scope, "Scope");
final HttpRoute route = scope.route;
final ComplexFuture<?> future = scope.future;
final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
context.setAttribute(HttpClientContext.HTTP_REQUEST, request);
@ -231,7 +232,7 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
if (!cacheableRequestPolicy.isServableFromCache(request)) {
log.debug("Request is not servable from cache");
future.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
operation.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
@Override
public void completed(final Boolean result) {
@ -250,7 +251,7 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
}));
} else {
future.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry entry) {
@ -506,8 +507,8 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
}
void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate, final ByteArrayBuffer buffer) {
final ComplexFuture<?> future = scope.future;
future.setDependency(responseCache.createCacheEntry(
final CancellableDependency operation = scope.cancellableDependency;
operation.setDependency(responseCache.createCacheEntry(
target,
request,
backendResponse,
@ -548,8 +549,8 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);
final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
if (cacheConfig.isFreshnessCheckEnabled()) {
final ComplexFuture<?> future = scope.future;
future.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
final CancellableDependency operation = scope.cancellableDependency;
operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry existingEntry) {
@ -681,9 +682,9 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate) {
final ComplexFuture<?> future = scope.future;
final CancellableDependency operation = scope.cancellableDependency;
recordCacheUpdate(scope.clientContext);
future.setDependency(responseCache.updateCacheEntry(
operation.setDependency(responseCache.updateCacheEntry(
target,
request,
cacheEntry,
@ -857,8 +858,8 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
recordCacheMiss(target, request);
if (mayCallBackend(request)) {
final ComplexFuture<?> future = scope.future;
future.setDependency(responseCache.getVariantCacheEntriesWithEtags(
final CancellableDependency operation = scope.cancellableDependency;
operation.setDependency(responseCache.getVariantCacheEntriesWithEtags(
target,
request,
new FutureCallback<Map<String, Variant>>() {
@ -897,7 +898,7 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final Map<String, Variant> variants) {
final ComplexFuture<?> future = scope.future;
final CancellableDependency operation = scope.cancellableDependency;
final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(request, variants);
final Date requestDate = getCurrentDate();
@ -907,7 +908,7 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
void updateVariantCacheEntry(final HttpResponse backendResponse, final Date responseDate, final Variant matchingVariant) {
recordCacheUpdate(scope.clientContext);
future.setDependency(responseCache.updateVariantCacheEntry(
operation.setDependency(responseCache.updateVariantCacheEntry(
target,
conditionalRequest,
backendResponse,
@ -924,7 +925,7 @@ public class AsyncCachingExec extends CachingExecBase implements AsyncExecChainH
} else {
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, responseEntry);
future.setDependency(responseCache.reuseVariantEntryFor(
operation.setDependency(responseCache.reuseVariantEntryFor(
target,
request,
matchingVariant,

View File

@ -42,6 +42,7 @@ import org.apache.hc.client5.http.cache.ResourceFactory;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.impl.Operations;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;

View File

@ -1,74 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.util.Args;
/**
* TODO: replace with ComplexCancellable from HttpCore 5.0b2
*/
final class ComplexCancellable implements Cancellable {
private final AtomicReference<Cancellable> dependencyRef;
private final AtomicBoolean cancelled;
public ComplexCancellable() {
this.dependencyRef = new AtomicReference<>(null);
this.cancelled = new AtomicBoolean(false);
}
public boolean isCancelled() {
return cancelled.get();
}
public void setDependency(final Cancellable dependency) {
Args.notNull(dependency, "dependency");
if (!cancelled.get()) {
dependencyRef.set(dependency);
} else {
dependency.cancel();
}
}
@Override
public boolean cancel() {
if (cancelled.compareAndSet(false, true)) {
final Cancellable dependency = dependencyRef.getAndSet(null);
if (dependency != null) {
dependency.cancel();
}
return true;
} else {
return false;
}
}
}

View File

@ -186,7 +186,7 @@ public class Request {
}
public void abort() throws UnsupportedOperationException {
this.request.abort();
this.request.cancel();
}
//// HTTP header operations

View File

@ -227,7 +227,7 @@ public class TestConnectionReuse extends LocalServerTestBase {
this.target,
httpget);
if (this.forceClose) {
httpget.abort();
httpget.cancel();
} else {
EntityUtils.consume(response.getEntity());
}

View File

@ -46,8 +46,8 @@ public class ClientAbortMethod {
System.out.println("----------------------------------------");
System.out.println(response.getCode() + " " + response.getReasonPhrase());
// Do not feel like reading the response body
// Call abort on the request object
httpget.abort();
// Call cancel on the request object
httpget.cancel();
}
}
}

View File

@ -1,48 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http;
import org.apache.hc.core5.concurrent.Cancellable;
/**
* This interface represents an object that can be made aware of
* long running, potentially blocking processes.
*
* TODO: replace with CancellableDependency from HttpCore 5.0b2
*/
public interface CancellableAware {
/**
* Sets {@link Cancellable} for the ongoing operation.
*/
void setCancellable(Cancellable cancellable);
boolean isCancelled();
}

View File

@ -30,7 +30,7 @@ import java.io.IOException;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
@ -43,8 +43,7 @@ public interface AsyncExecChain {
public final String exchangeId;
public final HttpRoute route;
public final HttpRequest originalRequest;
//TODO: replace with CancellableDependency from HttpCore
public final ComplexFuture<?> future;
public final CancellableDependency cancellableDependency;
public final HttpClientContext clientContext;
public final AsyncExecRuntime execRuntime;
@ -52,13 +51,13 @@ public interface AsyncExecChain {
final String exchangeId,
final HttpRoute route,
final HttpRequest originalRequest,
final ComplexFuture<?> future,
final CancellableDependency cancellableDependency,
final HttpClientContext clientContext,
final AsyncExecRuntime execRuntime) {
this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
this.route = Args.notNull(route, "Route");
this.originalRequest = Args.notNull(originalRequest, "Original request");
this.future = Args.notNull(future, "Future");
this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency");
this.clientContext = clientContext != null ? clientContext : HttpClientContext.create();
this.execRuntime = Args.notNull(execRuntime, "Exec runtime");
}

View File

@ -29,10 +29,10 @@ package org.apache.hc.client5.http.classic;
import java.io.IOException;
import org.apache.hc.client5.http.CancellableAware;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpException;
@ -85,6 +85,6 @@ public interface ExecRuntime {
void setConnectionValidFor(TimeValue duration);
ExecRuntime fork(CancellableAware cancellableAware);
ExecRuntime fork(CancellableDependency cancellableAware);
}

View File

@ -30,50 +30,50 @@ import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.CancellableAware;
import org.apache.hc.client5.http.config.Configurable;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
public class HttpUriRequestBase extends BasicClassicHttpRequest implements CancellableAware, Configurable {
public class HttpUriRequestBase extends BasicClassicHttpRequest implements CancellableDependency, Configurable {
private static final long serialVersionUID = 1L;
private RequestConfig requestConfig;
private final AtomicBoolean aborted;
private final AtomicBoolean cancelled;
private final AtomicReference<Cancellable> cancellableRef;
public HttpUriRequestBase(final String method, final URI requestUri) {
super(method, requestUri);
this.aborted = new AtomicBoolean(false);
this.cancelled = new AtomicBoolean(false);
this.cancellableRef = new AtomicReference<>(null);
}
public void abort() {
if (this.aborted.compareAndSet(false, true)) {
@Override
public boolean cancel() {
if (this.cancelled.compareAndSet(false, true)) {
final Cancellable cancellable = this.cancellableRef.getAndSet(null);
if (cancellable != null) {
cancellable.cancel();
}
return true;
} else {
return false;
}
}
@Override
public boolean isCancelled() {
return isAborted();
}
public boolean isAborted() {
return this.aborted.get();
return cancelled.get();
}
/**
* @since 4.2
*/
@Override
public void setCancellable(final Cancellable cancellable) {
if (!this.aborted.get()) {
public void setDependency(final Cancellable cancellable) {
if (!this.cancelled.get()) {
this.cancellableRef.set(cancellable);
}
}
@ -88,7 +88,7 @@ public class HttpUriRequestBase extends BasicClassicHttpRequest implements Cance
if (cancellable != null) {
cancellable.cancel();
}
this.aborted.set(false);
this.cancelled.set(false);
}
public void setConfig(final RequestConfig requestConfig) {

View File

@ -153,7 +153,7 @@ public class DefaultHttpRequestRetryHandler implements HttpRequestRetryHandler {
}
}
}
if (request instanceof HttpUriRequestBase && ((HttpUriRequestBase)request).isAborted()) {
if (request instanceof HttpUriRequestBase && ((HttpUriRequestBase)request).isCancelled()) {
return false;
}
if (handleAsIdempotent(request)) {

View File

@ -47,7 +47,7 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.HttpRouteDirector;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
@ -119,7 +119,7 @@ public final class AsyncConnectExec implements AsyncExecChainHandler {
final String exchangeId = scope.exchangeId;
final HttpRoute route = scope.route;
final ComplexFuture<?> future = scope.future;
final CancellableDependency cancellableDependency = scope.cancellableDependency;
final HttpClientContext clientContext = scope.clientContext;
final AsyncExecRuntime execRuntime = scope.execRuntime;
final State state = new State(route);
@ -149,7 +149,7 @@ public final class AsyncConnectExec implements AsyncExecChainHandler {
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": acquiring connection with route " + route);
}
future.setDependency(execRuntime.acquireConnection(
cancellableDependency.setDependency(execRuntime.acquireConnection(
route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
@ -184,7 +184,7 @@ public final class AsyncConnectExec implements AsyncExecChainHandler {
final RouteTracker tracker = state.tracker;
final AsyncExecRuntime execRuntime = scope.execRuntime;
final HttpRoute route = scope.route;
final ComplexFuture<?> future = scope.future;
final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext clientContext = scope.clientContext;
int step;
@ -193,7 +193,7 @@ public final class AsyncConnectExec implements AsyncExecChainHandler {
step = routeDirector.nextStep(route, fact);
switch (step) {
case HttpRouteDirector.CONNECT_TARGET:
future.setDependency(execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
operation.setDependency(execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime execRuntime) {
@ -216,7 +216,7 @@ public final class AsyncConnectExec implements AsyncExecChainHandler {
return;
case HttpRouteDirector.CONNECT_PROXY:
future.setDependency(execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
operation.setDependency(execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime execRuntime) {

View File

@ -160,7 +160,7 @@ class AsyncRedirectExec implements AsyncExecChainHandler {
}
}
state.currentScope = new AsyncExecChain.Scope(scope.exchangeId, newRoute,
scope.originalRequest, scope.future, clientContext, scope.execRuntime);
scope.originalRequest, scope.cancellableDependency, clientContext, scope.execRuntime);
}
}
}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.CancellableAware;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.classic.ExecRuntime;
import org.apache.hc.client5.http.config.RequestConfig;
@ -41,6 +40,7 @@ import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.io.LeaseRequest;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
@ -58,7 +58,7 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
private final HttpClientConnectionManager manager;
private final HttpRequestExecutor requestExecutor;
private final CancellableAware cancellableAware;
private final CancellableDependency cancellableDependency;
private final AtomicReference<ConnectionEndpoint> endpointRef;
private volatile boolean reusable;
@ -69,19 +69,19 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
final Logger log,
final HttpClientConnectionManager manager,
final HttpRequestExecutor requestExecutor,
final CancellableAware cancellableAware) {
final CancellableDependency cancellableDependency) {
super();
this.log = log;
this.manager = manager;
this.requestExecutor = requestExecutor;
this.cancellableAware = cancellableAware;
this.cancellableDependency = cancellableDependency;
this.endpointRef = new AtomicReference<>(null);
this.validDuration = TimeValue.NEG_ONE_MILLISECONDS;
}
@Override
public boolean isExecutionAborted() {
return cancellableAware != null && cancellableAware.isCancelled();
return cancellableDependency != null && cancellableDependency.isCancelled();
}
@Override
@ -97,19 +97,19 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
final Timeout requestTimeout = requestConfig.getConnectionRequestTimeout();
final LeaseRequest connRequest = manager.lease(route, requestTimeout, object);
state = object;
if (cancellableAware != null) {
if (cancellableAware.isCancelled()) {
if (cancellableDependency != null) {
if (cancellableDependency.isCancelled()) {
connRequest.cancel();
throw new RequestFailedException("Request aborted");
}
cancellableAware.setCancellable(connRequest);
cancellableDependency.setDependency(connRequest);
}
try {
final ConnectionEndpoint connectionEndpoint = connRequest.get(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
endpointRef.set(connectionEndpoint);
reusable = connectionEndpoint.isConnected();
if (cancellableAware != null) {
cancellableAware.setCancellable(this);
if (cancellableDependency != null) {
cancellableDependency.setDependency(this);
}
} catch(final TimeoutException ex) {
throw new ConnectionRequestTimeoutException(ex.getMessage());
@ -143,8 +143,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
}
private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
if (cancellableAware != null) {
if (cancellableAware.isCancelled()) {
if (cancellableDependency != null) {
if (cancellableDependency.isCancelled()) {
throw new RequestFailedException("Request aborted");
}
}
@ -256,8 +256,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
}
@Override
public ExecRuntime fork(final CancellableAware cancellableAware) {
return new InternalExecRuntime(log, manager, requestExecutor, cancellableAware);
public ExecRuntime fork(final CancellableDependency cancellableDependency) {
return new InternalExecRuntime(log, manager, requestExecutor, cancellableDependency);
}
}

View File

@ -31,7 +31,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hc.client5.http.CancellableAware;
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
@ -49,6 +48,7 @@ import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpException;
@ -160,7 +160,7 @@ class InternalHttpClient extends CloseableHttpClient implements Configurable {
final HttpRoute route = determineRoute(target, request, localcontext);
final String exchangeId = ExecSupport.getNextExchangeId();
final ExecRuntime execRuntime = new InternalExecRuntime(log, connManager, requestExecutor,
request instanceof CancellableAware ? (CancellableAware) request : null);
request instanceof CancellableDependency ? (CancellableDependency) request : null);
final ExecChain.Scope scope = new ExecChain.Scope(exchangeId, route, request, execRuntime, localcontext);
final ClassicHttpResponse response = this.execChain.execute(ClassicRequestCopier.INSTANCE.copy(request), scope);
return CloseableHttpResponse.adapt(response);

View File

@ -30,7 +30,6 @@ package org.apache.hc.client5.http.impl.classic;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hc.client5.http.CancellableAware;
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.SchemePortResolver;
@ -45,6 +44,7 @@ import org.apache.hc.client5.http.protocol.RequestClientConnControl;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ConnectionReuseStrategy;
@ -122,7 +122,7 @@ public class MinimalHttpClient extends CloseableHttpClient {
final HttpRoute route = new HttpRoute(RoutingSupport.normalize(target, schemePortResolver));
final ExecRuntime execRuntime = new InternalExecRuntime(log, connManager, requestExecutor,
request instanceof CancellableAware ? (CancellableAware) request : null);
request instanceof CancellableDependency ? (CancellableDependency) request : null);
try {
if (!execRuntime.isConnectionAcquired()) {
execRuntime.acquireConnection(route, null, clientContext);

View File

@ -60,7 +60,7 @@ public class TestDefaultHttpRequestRetryHandler {
@Test
public void noRetryOnAbortedRequests() throws Exception{
final HttpGet request = new HttpGet("/");
request.abort();
request.cancel();
final DefaultHttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler();

View File

@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hc.client5.http.CancellableAware;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.io.ConnectionEndpoint;
@ -38,6 +37,7 @@ import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.io.LeaseRequest;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
@ -64,7 +64,7 @@ public class TestInternalExecRuntime {
@Mock
private HttpRequestExecutor requestExecutor;
@Mock
private CancellableAware cancellableAware;
private CancellableDependency cancellableDependency;
@Mock
private ConnectionEndpoint connectionEndpoint;
@ -75,7 +75,7 @@ public class TestInternalExecRuntime {
public void setup() {
MockitoAnnotations.initMocks(this);
route = new HttpRoute(new HttpHost("host", 80));
execRuntime = new InternalExecRuntime(log, mgr, requestExecutor, cancellableAware);
execRuntime = new InternalExecRuntime(log, mgr, requestExecutor, cancellableDependency);
}
@Test
@ -100,9 +100,9 @@ public class TestInternalExecRuntime {
Assert.assertFalse(execRuntime.isConnectionReusable());
Mockito.verify(leaseRequest).get(345, TimeUnit.MILLISECONDS);
Mockito.verify(cancellableAware, Mockito.times(1)).setCancellable(leaseRequest);
Mockito.verify(cancellableAware, Mockito.times(1)).setCancellable(execRuntime);
Mockito.verify(cancellableAware, Mockito.times(2)).setCancellable(Mockito.<Cancellable>any());
Mockito.verify(cancellableDependency, Mockito.times(1)).setDependency(leaseRequest);
Mockito.verify(cancellableDependency, Mockito.times(1)).setDependency(execRuntime);
Mockito.verify(cancellableDependency, Mockito.times(2)).setDependency(Mockito.<Cancellable>any());
}
@Test(expected = IllegalStateException.class)