From 29ba623ebeec67cd6e8d940b2fed9151c16e4daa Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Sun, 20 Jun 2021 21:39:18 +0200 Subject: [PATCH] Fixed connection lease request cancellation race in both classic and asyc pooling connection managers --- .../client5/testing/async/TestHttp1Async.java | 78 ++++++++++ .../sync/TestClientRequestExecution.java | 62 ++++++++ .../impl/classic/InternalExecRuntime.java | 17 +-- .../PoolingHttpClientConnectionManager.java | 17 +-- .../PoolingAsyncClientConnectionManager.java | 140 +++++++++++------- ...estPoolingHttpClientConnectionManager.java | 35 ----- 6 files changed, 235 insertions(+), 114 deletions(-) diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java index 5d1eccdf2..3cdacd720 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1Async.java @@ -28,7 +28,12 @@ package org.apache.hc.client5.testing.async; import java.util.Arrays; import java.util.Collection; +import java.util.Random; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; @@ -200,4 +205,77 @@ public class TestHttp1Async extends AbstractHttpAsyncFundamentalsTest future = httpclient.execute( + SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/1000") + .build(), null); + + executorService.schedule(new Runnable() { + + @Override + public void run() { + future.cancel(true); + } + }, i % 5, TimeUnit.MILLISECONDS); + + try { + future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + } catch (final TimeoutException ex) { + throw ex; + } catch (final Exception ignore) { + } + } + + final Random rnd = new Random(); + for (int i = 0; i < 20; i++) { + final Future future = httpclient.execute( + SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/1000") + .build(), null); + + executorService.schedule(new Runnable() { + + @Override + public void run() { + future.cancel(true); + } + }, rnd.nextInt(200), TimeUnit.MILLISECONDS); + + try { + future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + } catch (final TimeoutException ex) { + throw ex; + } catch (final Exception ignore) { + } + } + + for (int i = 0; i < 5; i++) { + final Future future = httpclient.execute( + SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/1000") + .build(), null); + final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + MatcherAssert.assertThat(response, CoreMatchers.notNullValue()); + MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(200)); + } + + } finally { + executorService.shutdownNow(); + } + } + } \ No newline at end of file diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java index d6f2bc20e..bfb16c33f 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestClientRequestExecution.java @@ -29,6 +29,10 @@ package org.apache.hc.client5.testing.sync; import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.URI; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.hc.client5.http.HttpRequestRetryStrategy; import org.apache.hc.client5.http.classic.methods.HttpGet; @@ -281,4 +285,62 @@ public class TestClientRequestExecution extends LocalServerTestBase { Assert.assertEquals(uri, location); } + @Test + public void testRequestCancellation() throws Exception { + this.connManager.setDefaultMaxPerRoute(1); + this.connManager.setMaxTotal(1); + + final HttpHost target = start(); + + final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + try { + + for (int i = 0; i < 20; i++) { + final HttpGet httpget = new HttpGet("/random/1000"); + + executorService.schedule(new Runnable() { + + @Override + public void run() { + httpget.cancel(); + } + }, 1, TimeUnit.MILLISECONDS); + + try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) { + EntityUtils.consume(response.getEntity()); + } catch (final Exception ignore) { + } + } + + final Random rnd = new Random(); + for (int i = 0; i < 20; i++) { + final HttpGet httpget = new HttpGet("/random/1000"); + + executorService.schedule(new Runnable() { + + @Override + public void run() { + httpget.cancel(); + } + }, rnd.nextInt(200), TimeUnit.MILLISECONDS); + + try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) { + EntityUtils.consume(response.getEntity()); + } catch (final Exception ignore) { + } + + } + + for (int i = 0; i < 5; i++) { + final HttpGet httpget = new HttpGet("/random/1000"); + try (final ClassicHttpResponse response = this.httpclient.execute(target, httpget)) { + EntityUtils.consume(response.getEntity()); + } + } + + } finally { + executorService.shutdownNow(); + } + } + } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java index ba0d0d1b3..790f076af 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/InternalExecRuntime.java @@ -104,10 +104,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable { state = object; if (cancellableDependency != null) { cancellableDependency.setDependency(connRequest); - if (cancellableDependency.isCancelled()) { - connRequest.cancel(); - throw new RequestFailedException("Request aborted"); - } } try { final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout); @@ -115,10 +111,6 @@ class InternalExecRuntime implements ExecRuntime, Cancellable { reusable = connectionEndpoint.isConnected(); if (cancellableDependency != null) { cancellableDependency.setDependency(this); - if (cancellableDependency.isCancelled()) { - cancel(); - throw new RequestFailedException("Request aborted"); - } } if (log.isDebugEnabled()) { log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint)); @@ -155,10 +147,8 @@ class InternalExecRuntime implements ExecRuntime, Cancellable { } private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException { - if (cancellableDependency != null) { - if (cancellableDependency.isCancelled()) { - throw new RequestFailedException("Request aborted"); - } + if (isExecutionAborted()) { + throw new RequestFailedException("Request aborted"); } final RequestConfig requestConfig = context.getRequestConfig(); final Timeout connectTimeout = requestConfig.getConnectTimeout(); @@ -208,6 +198,9 @@ class InternalExecRuntime implements ExecRuntime, Cancellable { if (!endpoint.isConnected()) { connectEndpoint(endpoint, context); } + if (isExecutionAborted()) { + throw new RequestFailedException("Request aborted"); + } final RequestConfig requestConfig = context.getRequestConfig(); final Timeout responseTimeout = requestConfig.getResponseTimeout(); if (responseTimeout != null) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java index 00b40e1e9..22a29e980 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java @@ -28,7 +28,6 @@ package org.apache.hc.client5.http.impl.io; import java.io.IOException; import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -288,9 +287,6 @@ public class PoolingHttpClientConnectionManager final PoolEntry poolEntry; try { poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit()); - if (poolEntry == null || leaseFuture.isCancelled()) { - throw new ExecutionException(new CancellationException("Operation cancelled")); - } } catch (final TimeoutException ex) { leaseFuture.cancel(true); throw ex; @@ -325,16 +321,9 @@ public class PoolingHttpClientConnectionManager } else { poolEntry.assignConnection(connFactory.createConnection(null)); } - if (leaseFuture.isCancelled()) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} endpoint lease cancelled", id); - } - pool.release(poolEntry, false); - } else { - this.endpoint = new InternalConnectionEndpoint(poolEntry); - if (LOG.isDebugEnabled()) { - LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint)); - } + this.endpoint = new InternalConnectionEndpoint(poolEntry); + if (LOG.isDebugEnabled()) { + LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint)); } return this.endpoint; } catch (final Exception ex) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index 53c34ab62..3915e3a28 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -29,7 +29,10 @@ package org.apache.hc.client5.http.impl.nio; import java.net.InetSocketAddress; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -48,6 +51,7 @@ import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Resolver; @@ -228,72 +232,102 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio if (LOG.isDebugEnabled()) { LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool)); } - final ComplexFuture resultFuture = new ComplexFuture<>(callback); - final ConnectionConfig connectionConfig = resolveConnectionConfig(route); - final Future> leaseFuture = pool.lease( - route, state, requestTimeout, new FutureCallback>() { + return new Future() { - void leaseCompleted(final PoolEntry poolEntry) { - final ManagedAsyncClientConnection connection = poolEntry.getConnection(); - if (connection != null) { - connection.activate(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool)); - } - final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry); - if (LOG.isDebugEnabled()) { - LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint)); - } - resultFuture.completed(endpoint); - } + final ConnectionConfig connectionConfig = resolveConnectionConfig(route); + final BasicFuture resultFuture = new BasicFuture<>(callback); - @Override - public void completed(final PoolEntry poolEntry) { - final ManagedAsyncClientConnection connection = poolEntry.getConnection(); - final TimeValue timeValue = connectionConfig != null ? connectionConfig.getValidateAfterInactivity() : null; - if (TimeValue.isNonNegative(timeValue) && connection != null && - poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) { - final ProtocolVersion protocolVersion = connection.getProtocolVersion(); - if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) { - connection.submitCommand(new PingCommand(new BasicPingHandler(result -> { - if (result == null || !result) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection)); + final Future> leaseFuture = pool.lease( + route, + state, + requestTimeout, new FutureCallback>() { + + @Override + public void completed(final PoolEntry poolEntry) { + final ManagedAsyncClientConnection connection = poolEntry.getConnection(); + final TimeValue timeValue = connectionConfig != null ? connectionConfig.getValidateAfterInactivity() : null; + if (TimeValue.isNonNegative(timeValue) && connection != null && + poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) { + final ProtocolVersion protocolVersion = connection.getProtocolVersion(); + if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) { + connection.submitCommand(new PingCommand(new BasicPingHandler(result -> { + if (result == null || !result) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection)); + } + poolEntry.discardConnection(CloseMode.IMMEDIATE); } - poolEntry.discardConnection(CloseMode.IMMEDIATE); + })), Command.Priority.IMMEDIATE); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection)); } - })), Command.Priority.IMMEDIATE); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection)); + poolEntry.discardConnection(CloseMode.IMMEDIATE); } - poolEntry.discardConnection(CloseMode.IMMEDIATE); } + leaseCompleted(poolEntry); } - leaseCompleted(poolEntry); - } - @Override - public void failed(final Exception ex) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} endpoint lease failed", id); + void leaseCompleted(final PoolEntry poolEntry) { + final ManagedAsyncClientConnection connection = poolEntry.getConnection(); + if (connection != null) { + connection.activate(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool)); + } + final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry); + if (LOG.isDebugEnabled()) { + LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint)); + } + resultFuture.completed(endpoint); } - resultFuture.failed(ex); - } - @Override - public void cancelled() { - if (LOG.isDebugEnabled()) { - LOG.debug("{} endpoint lease cancelled", id); + @Override + public void failed(final Exception ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} endpoint lease failed", id); + } + resultFuture.failed(ex); } - resultFuture.cancel(); - } - }); + @Override + public void cancelled() { + if (LOG.isDebugEnabled()) { + LOG.debug("{} endpoint lease cancelled", id); + } + resultFuture.cancel(); + } - resultFuture.setDependency(leaseFuture); - return resultFuture; + }); + + @Override + public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException { + return resultFuture.get(); + } + + @Override + public AsyncConnectionEndpoint get( + final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return resultFuture.get(timeout, unit); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return leaseFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isDone() { + return resultFuture.isDone(); + } + + @Override + public boolean isCancelled() { + return resultFuture.isCancelled(); + } + + }; } @Override diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java index 14cce077f..29dbdd9b9 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java @@ -30,7 +30,6 @@ package org.apache.hc.client5.http.impl.io; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -100,11 +99,8 @@ public class TestPoolingHttpClientConnectionManager { final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); entry.assignConnection(conn); - Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE); - Mockito.when(conn.isOpen()).thenReturn(true); Mockito.when(conn.isConsistent()).thenReturn(true); - Mockito.when(future.isCancelled()).thenReturn(false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( Mockito.eq(route), @@ -130,9 +126,6 @@ public class TestPoolingHttpClientConnectionManager { final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); - Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE); - - Mockito.when(future.isCancelled()).thenReturn(false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( Mockito.eq(route), @@ -151,28 +144,6 @@ public class TestPoolingHttpClientConnectionManager { Mockito.verify(pool).release(entry, false); } - @Test - public void testLeaseFutureCancelled() throws Exception { - final HttpHost target = new HttpHost("localhost", 80); - final HttpRoute route = new HttpRoute(target); - - final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); - entry.assignConnection(conn); - - Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE); - Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); - Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) - .thenReturn(future); - - final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); - Assert.assertThrows(ExecutionException.class, () -> - connRequest1.get(Timeout.ofSeconds(1))); - } - @Test public void testLeaseFutureTimeout() throws Exception { final HttpHost target = new HttpHost("localhost", 80); @@ -199,7 +170,6 @@ public class TestPoolingHttpClientConnectionManager { final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); entry.assignConnection(conn); - Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( Mockito.eq(route), @@ -229,7 +199,6 @@ public class TestPoolingHttpClientConnectionManager { final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); entry.assignConnection(conn); - Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( Mockito.eq(route), @@ -260,9 +229,7 @@ public class TestPoolingHttpClientConnectionManager { final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); entry.assignConnection(conn); - Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE); Mockito.when(conn.isOpen()).thenReturn(false); - Mockito.when(future.isCancelled()).thenReturn(false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( Mockito.eq(route), @@ -313,9 +280,7 @@ public class TestPoolingHttpClientConnectionManager { final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); entry.assignConnection(conn); - Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE); Mockito.when(conn.isOpen()).thenReturn(false); - Mockito.when(future.isCancelled()).thenReturn(false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( Mockito.eq(route),