Fixed connection lease request cancellation race in both classic and asyc pooling connection managers

This commit is contained in:
Oleg Kalnichevski 2021-06-20 21:39:18 +02:00
parent aff1d2024c
commit 29ba623ebe
6 changed files with 235 additions and 114 deletions

View File

@ -28,7 +28,12 @@ package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
@ -200,4 +205,77 @@ public class TestHttp1Async extends AbstractHttpAsyncFundamentalsTest<CloseableH
MatcherAssert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
}
@Test
public void testRequestCancellation() throws Exception {
this.connManager.setDefaultMaxPerRoute(1);
this.connManager.setMaxTotal(1);
final HttpHost target = start();
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
try {
for (int i = 0; i < 20; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/1000")
.build(), null);
executorService.schedule(new Runnable() {
@Override
public void run() {
future.cancel(true);
}
}, i % 5, TimeUnit.MILLISECONDS);
try {
future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
} catch (final TimeoutException ex) {
throw ex;
} catch (final Exception ignore) {
}
}
final Random rnd = new Random();
for (int i = 0; i < 20; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/1000")
.build(), null);
executorService.schedule(new Runnable() {
@Override
public void run() {
future.cancel(true);
}
}, rnd.nextInt(200), TimeUnit.MILLISECONDS);
try {
future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
} catch (final TimeoutException ex) {
throw ex;
} catch (final Exception ignore) {
}
}
for (int i = 0; i < 5; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/1000")
.build(), null);
final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
MatcherAssert.assertThat(response, CoreMatchers.notNullValue());
MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
}
} finally {
executorService.shutdownNow();
}
}
}

View File

@ -29,6 +29,10 @@ package org.apache.hc.client5.testing.sync;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.classic.methods.HttpGet;
@ -281,4 +285,62 @@ public class TestClientRequestExecution extends LocalServerTestBase {
Assert.assertEquals(uri, location);
}
@Test
public void testRequestCancellation() throws Exception {
this.connManager.setDefaultMaxPerRoute(1);
this.connManager.setMaxTotal(1);
final HttpHost target = start();
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
try {
for (int i = 0; i < 20; i++) {
final HttpGet httpget = new HttpGet("/random/1000");
executorService.schedule(new Runnable() {
@Override
public void run() {
httpget.cancel();
}
}, 1, TimeUnit.MILLISECONDS);
try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
EntityUtils.consume(response.getEntity());
} catch (final Exception ignore) {
}
}
final Random rnd = new Random();
for (int i = 0; i < 20; i++) {
final HttpGet httpget = new HttpGet("/random/1000");
executorService.schedule(new Runnable() {
@Override
public void run() {
httpget.cancel();
}
}, rnd.nextInt(200), TimeUnit.MILLISECONDS);
try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
EntityUtils.consume(response.getEntity());
} catch (final Exception ignore) {
}
}
for (int i = 0; i < 5; i++) {
final HttpGet httpget = new HttpGet("/random/1000");
try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) {
EntityUtils.consume(response.getEntity());
}
}
} finally {
executorService.shutdownNow();
}
}
}

View File

@ -104,10 +104,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
state = object;
if (cancellableDependency != null) {
cancellableDependency.setDependency(connRequest);
if (cancellableDependency.isCancelled()) {
connRequest.cancel();
throw new RequestFailedException("Request aborted");
}
}
try {
final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
@ -115,10 +111,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
reusable = connectionEndpoint.isConnected();
if (cancellableDependency != null) {
cancellableDependency.setDependency(this);
if (cancellableDependency.isCancelled()) {
cancel();
throw new RequestFailedException("Request aborted");
}
}
if (log.isDebugEnabled()) {
log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
@ -155,10 +147,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
}
private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
if (cancellableDependency != null) {
if (cancellableDependency.isCancelled()) {
throw new RequestFailedException("Request aborted");
}
if (isExecutionAborted()) {
throw new RequestFailedException("Request aborted");
}
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout connectTimeout = requestConfig.getConnectTimeout();
@ -208,6 +198,9 @@ class InternalExecRuntime implements ExecRuntime, Cancellable {
if (!endpoint.isConnected()) {
connectEndpoint(endpoint, context);
}
if (isExecutionAborted()) {
throw new RequestFailedException("Request aborted");
}
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
if (responseTimeout != null) {

View File

@ -28,7 +28,6 @@ package org.apache.hc.client5.http.impl.io;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@ -288,9 +287,6 @@ public class PoolingHttpClientConnectionManager
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
try {
poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
if (poolEntry == null || leaseFuture.isCancelled()) {
throw new ExecutionException(new CancellationException("Operation cancelled"));
}
} catch (final TimeoutException ex) {
leaseFuture.cancel(true);
throw ex;
@ -325,16 +321,9 @@ public class PoolingHttpClientConnectionManager
} else {
poolEntry.assignConnection(connFactory.createConnection(null));
}
if (leaseFuture.isCancelled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease cancelled", id);
}
pool.release(poolEntry, false);
} else {
this.endpoint = new InternalConnectionEndpoint(poolEntry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
this.endpoint = new InternalConnectionEndpoint(poolEntry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
return this.endpoint;
} catch (final Exception ex) {

View File

@ -29,7 +29,10 @@ package org.apache.hc.client5.http.impl.nio;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -48,6 +51,7 @@ import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Resolver;
@ -228,72 +232,102 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
}
final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
return new Future<AsyncConnectionEndpoint>() {
void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
if (connection != null) {
connection.activate();
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
}
final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
resultFuture.completed(endpoint);
}
final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
@Override
public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
final TimeValue timeValue = connectionConfig != null ? connectionConfig.getValidateAfterInactivity() : null;
if (TimeValue.isNonNegative(timeValue) && connection != null &&
poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
final ProtocolVersion protocolVersion = connection.getProtocolVersion();
if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
if (result == null || !result) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
route,
state,
requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
@Override
public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
final TimeValue timeValue = connectionConfig != null ? connectionConfig.getValidateAfterInactivity() : null;
if (TimeValue.isNonNegative(timeValue) && connection != null &&
poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
final ProtocolVersion protocolVersion = connection.getProtocolVersion();
if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
if (result == null || !result) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
}
poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
poolEntry.discardConnection(CloseMode.IMMEDIATE);
})), Command.Priority.IMMEDIATE);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
}
})), Command.Priority.IMMEDIATE);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
leaseCompleted(poolEntry);
}
leaseCompleted(poolEntry);
}
@Override
public void failed(final Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease failed", id);
void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
if (connection != null) {
connection.activate();
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
}
final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
resultFuture.completed(endpoint);
}
resultFuture.failed(ex);
}
@Override
public void cancelled() {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease cancelled", id);
@Override
public void failed(final Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease failed", id);
}
resultFuture.failed(ex);
}
resultFuture.cancel();
}
});
@Override
public void cancelled() {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease cancelled", id);
}
resultFuture.cancel();
}
resultFuture.setDependency(leaseFuture);
return resultFuture;
});
@Override
public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
return resultFuture.get();
}
@Override
public AsyncConnectionEndpoint get(
final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return resultFuture.get(timeout, unit);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return leaseFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isDone() {
return resultFuture.isDone();
}
@Override
public boolean isCancelled() {
return resultFuture.isCancelled();
}
};
}
@Override

View File

@ -30,7 +30,6 @@ package org.apache.hc.client5.http.impl.io;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -100,11 +99,8 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(conn.isOpen()).thenReturn(true);
Mockito.when(conn.isConsistent()).thenReturn(true);
Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@ -130,9 +126,6 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@ -151,28 +144,6 @@ public class TestPoolingHttpClientConnectionManager {
Mockito.verify(pool).release(entry, false);
}
@Test
public void testLeaseFutureCancelled() throws Exception {
final HttpHost target = new HttpHost("localhost", 80);
final HttpRoute route = new HttpRoute(target);
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
Mockito.eq(null),
Mockito.any(),
Mockito.eq(null)))
.thenReturn(future);
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
Assert.assertThrows(ExecutionException.class, () ->
connRequest1.get(Timeout.ofSeconds(1)));
}
@Test
public void testLeaseFutureTimeout() throws Exception {
final HttpHost target = new HttpHost("localhost", 80);
@ -199,7 +170,6 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@ -229,7 +199,6 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@ -260,9 +229,7 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(conn.isOpen()).thenReturn(false);
Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
@ -313,9 +280,7 @@ public class TestPoolingHttpClientConnectionManager {
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
entry.assignConnection(conn);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(conn.isOpen()).thenReturn(false);
Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),