Fixes #2065 - Backport #347 to Jetty 9.2.x.

This commit is contained in:
Simone Bordet 2017-12-15 17:48:47 +01:00
parent e449387158
commit ce4fad3deb
3 changed files with 124 additions and 13 deletions

View File

@ -169,9 +169,12 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
int port = request.getPort(); int port = request.getPort();
if (port >= 0 && getPort() != port) if (port >= 0 && getPort() != port)
throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this); throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
send(new HttpExchange(this, request, listeners));
}
HttpExchange exchange = new HttpExchange(this, request, listeners); public void send(HttpExchange exchange)
{
HttpRequest request = exchange.getRequest();
if (client.isRunning()) if (client.isRunning())
{ {
if (enqueue(exchanges, exchange)) if (enqueue(exchanges, exchange))

View File

@ -19,9 +19,11 @@
package org.eclipse.jetty.client.http; package org.eclipse.jetty.client.http;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
@ -40,6 +42,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
{ {
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class); private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
private final AtomicLong idleTime = new AtomicLong(System.nanoTime());
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger(); private final AtomicInteger sweeps = new AtomicInteger();
private final Promise<Connection> promise; private final Promise<Connection> promise;
@ -105,13 +108,57 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return closed.get(); return closed.get();
} }
private void send(HttpChannelOverHTTP channel, HttpExchange exchange)
{
boolean send;
while (true)
{
long idleTime = this.idleTime.get();
send = idleTime != Long.MIN_VALUE;
if (send && !this.idleTime.compareAndSet(idleTime, System.nanoTime()))
continue;
break;
}
if (send)
{
if (channel.associate(exchange))
channel.send();
else
channel.release();
}
else
{
// This connection idle timed out before we could send the exchange, retry.
HttpDestinationOverHTTP destination = getHttpDestination();
destination.send(exchange);
}
}
@Override @Override
protected boolean onReadTimeout() protected boolean onReadTimeout()
{ {
if (LOG.isDebugEnabled()) while (true)
LOG.debug("{} idle timeout", this); {
close(new TimeoutException()); long idleTime = this.idleTime.get();
return false; long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime);
long timeout = getEndPoint().getIdleTimeout();
boolean idle = elapsed > timeout + timeout / 2;
if (idle)
{
if (!this.idleTime.compareAndSet(idleTime, Long.MIN_VALUE))
continue;
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout", this);
close(new TimeoutException());
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout skipped", this);
}
return false;
}
} }
@Override @Override
@ -215,10 +262,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
endPoint.setIdleTimeout(request.getIdleTimeout()); endPoint.setIdleTimeout(request.getIdleTimeout());
// One channel per connection, just delegate the send // One channel per connection, just delegate the send
if (channel.associate(exchange)) HttpConnectionOverHTTP.this.send(channel, exchange);
channel.send();
else
channel.release();
} }
@Override @Override

View File

@ -40,7 +40,9 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.client.util.InputStreamContentProvider;
@ -53,7 +55,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
@ -462,7 +466,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
}); });
Assert.fail(); Assert.fail();
} }
catch (Exception expected) catch (Exception ignored)
{ {
} }
@ -472,7 +476,67 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertNull(request.getAbortCause()); Assert.assertNull(request.getAbortCause());
} }
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException @Test
public void testIdleTimeoutJustBeforeSendingRequest() throws Exception
{
start(new EmptyServerHandler());
final long idleTimeout = 1000;
client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this)
{
@Override
public boolean associate(HttpExchange exchange)
{
try
{
// We idle timeout just before the association,
// we must be able to send the request successfully.
Thread.sleep(idleTimeout + idleTimeout / 4);
return super.associate(exchange);
}
catch (InterruptedException e)
{
return false;
}
}
};
}
};
}
}, sslContextFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client.setExecutor(clientThreads);
client.setIdleTimeout(idleTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
latch.countDown();
}
});
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}
private void assumeConnectTimeout(String host, int port, int connectTimeout)
{ {
try (Socket socket = new Socket()) try (Socket socket = new Socket())
{ {
@ -500,7 +564,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
{ {
private final long timeout; private final long timeout;
public TimeoutHandler(long timeout) private TimeoutHandler(long timeout)
{ {
this.timeout = timeout; this.timeout = timeout;
} }