From ce4fad3deb283b8cad19f574beae62f19a250c24 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 15 Dec 2017 17:48:47 +0100 Subject: [PATCH] Fixes #2065 - Backport #347 to Jetty 9.2.x. --- .../eclipse/jetty/client/HttpDestination.java | 7 +- .../client/http/HttpConnectionOverHTTP.java | 60 +++++++++++++--- .../jetty/client/HttpClientTimeoutTest.java | 70 ++++++++++++++++++- 3 files changed, 124 insertions(+), 13 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index f99ff66cbd7..ff26c741adb 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -169,9 +169,12 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl int port = request.getPort(); if (port >= 0 && getPort() != port) throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this); + send(new HttpExchange(this, request, listeners)); + } - HttpExchange exchange = new HttpExchange(this, request, listeners); - + public void send(HttpExchange exchange) + { + HttpRequest request = exchange.getRequest(); if (client.isRunning()) { if (enqueue(exchanges, exchange)) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index f00a204c2ba..1fa600fe417 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -19,9 +19,11 @@ package org.eclipse.jetty.client.http; import java.nio.channels.AsynchronousCloseException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; @@ -40,6 +42,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec { private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class); + private final AtomicLong idleTime = new AtomicLong(System.nanoTime()); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Promise promise; @@ -105,13 +108,57 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec return closed.get(); } + private void send(HttpChannelOverHTTP channel, HttpExchange exchange) + { + boolean send; + while (true) + { + long idleTime = this.idleTime.get(); + send = idleTime != Long.MIN_VALUE; + if (send && !this.idleTime.compareAndSet(idleTime, System.nanoTime())) + continue; + break; + } + + if (send) + { + if (channel.associate(exchange)) + channel.send(); + else + channel.release(); + } + else + { + // This connection idle timed out before we could send the exchange, retry. + HttpDestinationOverHTTP destination = getHttpDestination(); + destination.send(exchange); + } + } + @Override protected boolean onReadTimeout() { - if (LOG.isDebugEnabled()) - LOG.debug("{} idle timeout", this); - close(new TimeoutException()); - return false; + while (true) + { + long idleTime = this.idleTime.get(); + long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime); + long timeout = getEndPoint().getIdleTimeout(); + boolean idle = elapsed > timeout + timeout / 2; + if (idle) + { + if (!this.idleTime.compareAndSet(idleTime, Long.MIN_VALUE)) + continue; + if (LOG.isDebugEnabled()) + LOG.debug("{} idle timeout", this); + close(new TimeoutException()); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("{} idle timeout skipped", this); + } + return false; + } } @Override @@ -215,10 +262,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec endPoint.setIdleTimeout(request.getIdleTimeout()); // One channel per connection, just delegate the send - if (channel.associate(exchange)) - channel.send(); - else - channel.release(); + HttpConnectionOverHTTP.this.send(channel, exchange); } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java index d5c42d00a61..585c1a7d623 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java @@ -40,7 +40,9 @@ import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpChannelOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.InputStreamContentProvider; @@ -53,7 +55,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Assume; @@ -462,7 +466,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest }); Assert.fail(); } - catch (Exception expected) + catch (Exception ignored) { } @@ -472,7 +476,67 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest Assert.assertNull(request.getAbortCause()); } - private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException + @Test + public void testIdleTimeoutJustBeforeSendingRequest() throws Exception + { + start(new EmptyServerHandler()); + + final long idleTimeout = 1000; + client = new HttpClient(new HttpClientTransportOverHTTP(1) + { + @Override + protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise) + { + return new HttpConnectionOverHTTP(endPoint, destination, promise) + { + @Override + protected HttpChannelOverHTTP newHttpChannel() + { + return new HttpChannelOverHTTP(this) + { + @Override + public boolean associate(HttpExchange exchange) + { + try + { + // We idle timeout just before the association, + // we must be able to send the request successfully. + Thread.sleep(idleTimeout + idleTimeout / 4); + return super.associate(exchange); + } + catch (InterruptedException e) + { + return false; + } + } + }; + } + }; + } + }, sslContextFactory); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + client.setExecutor(clientThreads); + client.setIdleTimeout(idleTimeout); + client.start(); + + final CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS)); + } + + private void assumeConnectTimeout(String host, int port, int connectTimeout) { try (Socket socket = new Socket()) { @@ -500,7 +564,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest { private final long timeout; - public TimeoutHandler(long timeout) + private TimeoutHandler(long timeout) { this.timeout = timeout; }