Implemented as integer guard + timestamp. The timestamp is not enough to guard against long times spent sending (and/or in application callbacks during sends).
This commit is contained in:
parent
ce4fad3deb
commit
eaa4c70cfc
|
@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.eclipse.jetty.client.HttpConnection;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
|
@ -42,13 +41,14 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
|||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
|
||||
|
||||
private final AtomicLong idleTime = new AtomicLong(System.nanoTime());
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final AtomicInteger sweeps = new AtomicInteger();
|
||||
private final Promise<Connection> promise;
|
||||
private final Delegate delegate;
|
||||
private final HttpChannelOverHTTP channel;
|
||||
private long idleTimeout;
|
||||
private int idleTimeoutGuard;
|
||||
private long idleTimeoutStamp;
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #HttpConnectionOverHTTP(EndPoint, HttpDestination, Promise)} instead
|
||||
|
@ -111,13 +111,11 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
|||
private void send(HttpChannelOverHTTP channel, HttpExchange exchange)
|
||||
{
|
||||
boolean send;
|
||||
while (true)
|
||||
synchronized (this)
|
||||
{
|
||||
long idleTime = this.idleTime.get();
|
||||
send = idleTime != Long.MIN_VALUE;
|
||||
if (send && !this.idleTime.compareAndSet(idleTime, System.nanoTime()))
|
||||
continue;
|
||||
break;
|
||||
send = idleTimeoutGuard >= 0;
|
||||
if (send)
|
||||
++idleTimeoutGuard;
|
||||
}
|
||||
|
||||
if (send)
|
||||
|
@ -126,6 +124,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
|||
channel.send();
|
||||
else
|
||||
channel.release();
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
--idleTimeoutGuard;
|
||||
idleTimeoutStamp = System.nanoTime();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -138,24 +142,24 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
|||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
{
|
||||
while (true)
|
||||
synchronized (this)
|
||||
{
|
||||
long idleTime = this.idleTime.get();
|
||||
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime);
|
||||
long timeout = getEndPoint().getIdleTimeout();
|
||||
boolean idle = elapsed > timeout + timeout / 2;
|
||||
if (idle)
|
||||
if (idleTimeoutGuard == 0)
|
||||
{
|
||||
if (!this.idleTime.compareAndSet(idleTime, Long.MIN_VALUE))
|
||||
continue;
|
||||
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTimeoutStamp);
|
||||
boolean idle = elapsed > idleTimeout / 2;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout", this);
|
||||
close(new TimeoutException());
|
||||
LOG.debug("Idle timeout {}/{}ms - {}", elapsed, idleTimeout, this);
|
||||
if (idle)
|
||||
{
|
||||
idleTimeoutStamp = -1;
|
||||
close(new TimeoutException("Idle timeout expired: " + idleTimeout + "ms"));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout skipped", this);
|
||||
LOG.debug("Idle timeout skipped - {}", this);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -486,37 +486,24 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
@Test
|
||||
public void test_QueuedRequest_IsSent_WhenPreviousRequestClosedConnection() throws Exception
|
||||
{
|
||||
start(new EmptyServerHandler());
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
if (target.endsWith("/one"))
|
||||
baseRequest.getHttpChannel().getEndPoint().close();
|
||||
else
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
|
||||
client.setMaxConnectionsPerDestination(1);
|
||||
final long idleTimeout = 1000;
|
||||
client.setIdleTimeout(idleTimeout);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(3);
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.path("/one")
|
||||
.listener(new Request.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onBegin(Request request)
|
||||
{
|
||||
try
|
||||
{
|
||||
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
x.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Request request, Throwable failure)
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
})
|
||||
.onResponseFailure(new Response.FailureListener()
|
||||
{
|
||||
@Override
|
||||
|
@ -541,7 +528,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
})
|
||||
.send(null);
|
||||
|
||||
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue