Fixes #6254 - Total timeout not enforced for queued requests.
Fixed logic in HttpDestination.RequestTimeouts, where now a timeout is scheduled only when the expiration time is less than the existing one. Various code cleanups. Renamed HttpDestination.TimeoutTask to RequestTimeouts for clarity. Improved javadocs, code comments and logging. Signed-off-by: Simone Bordet <simone.bordet@gmail.com> (cherry picked from commit5f23689aa7
) (cherry picked from commitda50e06b64
) (cherry picked from commit88ac10439a
)
This commit is contained in:
parent
54e47612f2
commit
de2d764290
|
@ -60,7 +60,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
private final ProxyConfiguration.Proxy proxy;
|
||||
private final ClientConnectionFactory connectionFactory;
|
||||
private final HttpField hostField;
|
||||
private final TimeoutTask timeout;
|
||||
private final RequestTimeouts requestTimeouts;
|
||||
private ConnectionPool connectionPool;
|
||||
|
||||
public HttpDestination(HttpClient client, Origin origin)
|
||||
|
@ -73,7 +73,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
this.requestNotifier = new RequestNotifier(client);
|
||||
this.responseNotifier = new ResponseNotifier();
|
||||
|
||||
this.timeout = new TimeoutTask(client.getScheduler());
|
||||
this.requestTimeouts = new RequestTimeouts(client.getScheduler());
|
||||
|
||||
String host = HostPort.normalizeHost(getHost());
|
||||
if (!client.isDefaultPort(getScheme(), getPort()))
|
||||
|
@ -257,7 +257,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
{
|
||||
long expiresAt = request.getTimeoutAt();
|
||||
if (expiresAt != -1)
|
||||
timeout.schedule(expiresAt);
|
||||
requestTimeouts.schedule(expiresAt);
|
||||
|
||||
if (!client.isRunning() && exchanges.remove(exchange))
|
||||
{
|
||||
|
@ -409,7 +409,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Closed {}", this);
|
||||
connectionPool.close();
|
||||
timeout.destroy();
|
||||
requestTimeouts.destroy();
|
||||
}
|
||||
|
||||
public void release(Connection connection)
|
||||
|
@ -527,15 +527,15 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
}
|
||||
|
||||
/**
|
||||
* This class enforces the total timeout for exchanges that are still in the queue.
|
||||
* The total timeout for exchanges that are not in the destination queue is enforced
|
||||
* by {@link HttpChannel}.
|
||||
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
|
||||
* <p>The total timeout for exchanges that are not in the destination queue
|
||||
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
|
||||
*/
|
||||
private class TimeoutTask extends CyclicTimeout
|
||||
private class RequestTimeouts extends CyclicTimeout
|
||||
{
|
||||
private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE);
|
||||
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);
|
||||
|
||||
private TimeoutTask(Scheduler scheduler)
|
||||
private RequestTimeouts(Scheduler scheduler)
|
||||
{
|
||||
super(scheduler);
|
||||
}
|
||||
|
@ -544,14 +544,18 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
public void onTimeoutExpired()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} timeout expired", this);
|
||||
LOG.debug("{} timeouts check", this);
|
||||
|
||||
nextTimeout.set(Long.MAX_VALUE);
|
||||
long now = System.nanoTime();
|
||||
long nextExpiresAt = Long.MAX_VALUE;
|
||||
long earliest = Long.MAX_VALUE;
|
||||
// Reset the earliest timeout so we can expire again.
|
||||
// A concurrent call to schedule(long) may lose an earliest
|
||||
// value, but the corresponding exchange is already enqueued
|
||||
// and will be seen by scanning the exchange queue below.
|
||||
earliestTimeout.set(earliest);
|
||||
|
||||
// Check all queued exchanges for those that have expired
|
||||
// and to determine when the next check must be.
|
||||
// Scan the message queue to abort expired exchanges
|
||||
// and to find the exchange that expire the earliest.
|
||||
for (HttpExchange exchange : exchanges)
|
||||
{
|
||||
HttpRequest request = exchange.getRequest();
|
||||
|
@ -560,34 +564,27 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
continue;
|
||||
if (expiresAt <= now)
|
||||
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
|
||||
else if (expiresAt < nextExpiresAt)
|
||||
nextExpiresAt = expiresAt;
|
||||
else if (expiresAt < earliest)
|
||||
earliest = expiresAt;
|
||||
}
|
||||
|
||||
if (nextExpiresAt < Long.MAX_VALUE && client.isRunning())
|
||||
schedule(nextExpiresAt);
|
||||
if (earliest < Long.MAX_VALUE && client.isRunning())
|
||||
schedule(earliest);
|
||||
}
|
||||
|
||||
private void schedule(long expiresAt)
|
||||
{
|
||||
// Schedule a timeout for the soonest any known exchange can expire.
|
||||
// If subsequently that exchange is removed from the queue, the
|
||||
// timeout is not cancelled, instead the entire queue is swept
|
||||
// for expired exchanges and a new timeout is set.
|
||||
long timeoutAt = nextTimeout.getAndUpdate(e -> Math.min(e, expiresAt));
|
||||
if (timeoutAt != expiresAt)
|
||||
// Schedule a timeout for the earliest exchange that may expire.
|
||||
// When the timeout expires, scan the exchange queue for the next
|
||||
// earliest exchange that may expire, and reschedule a new timeout.
|
||||
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
|
||||
if (expiresAt < prevEarliest)
|
||||
{
|
||||
long delay = expiresAt - System.nanoTime();
|
||||
if (delay <= 0)
|
||||
{
|
||||
onTimeoutExpired();
|
||||
}
|
||||
else
|
||||
{
|
||||
schedule(delay, TimeUnit.NANOSECONDS);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} scheduled timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
|
||||
}
|
||||
// A new request expires earlier than previous requests, schedule it.
|
||||
long delay = Math.max(0, expiresAt - System.nanoTime());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
|
||||
schedule(delay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TimeoutCompleteListener.class);
|
||||
|
||||
private final AtomicReference<Request> request = new AtomicReference<>();
|
||||
private final AtomicReference<Request> requestTimeout = new AtomicReference<>();
|
||||
|
||||
public TimeoutCompleteListener(Scheduler scheduler)
|
||||
{
|
||||
|
@ -39,7 +39,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C
|
|||
@Override
|
||||
public void onTimeoutExpired()
|
||||
{
|
||||
Request request = this.request.getAndSet(null);
|
||||
Request request = requestTimeout.getAndSet(null);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Total timeout {} ms elapsed for {} on {}", request.getTimeout(), request, this);
|
||||
if (request != null)
|
||||
|
@ -49,7 +49,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C
|
|||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Request request = this.request.getAndSet(null);
|
||||
Request request = requestTimeout.getAndSet(null);
|
||||
if (request != null)
|
||||
{
|
||||
boolean cancelled = cancel();
|
||||
|
@ -60,19 +60,12 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C
|
|||
|
||||
void schedule(HttpRequest request, long timeoutAt)
|
||||
{
|
||||
if (this.request.compareAndSet(null, request))
|
||||
if (requestTimeout.compareAndSet(null, request))
|
||||
{
|
||||
long delay = timeoutAt - System.nanoTime();
|
||||
if (delay <= 0)
|
||||
{
|
||||
onTimeoutExpired();
|
||||
}
|
||||
else
|
||||
{
|
||||
schedule(delay, TimeUnit.NANOSECONDS);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Scheduled timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this);
|
||||
}
|
||||
long delay = Math.max(0, timeoutAt - System.nanoTime());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Scheduling timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this);
|
||||
schedule(delay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,23 @@ import static java.lang.Long.MAX_VALUE;
|
|||
* <p>Subclasses should implement {@link #onTimeoutExpired()}.</p>
|
||||
* <p>This implementation is optimised assuming that the timeout
|
||||
* will mostly be cancelled and then reused with a similar value.</p>
|
||||
* <p>The typical scenario to use this class is when you have events
|
||||
* that postpone (by re-scheduling), or cancel then re-schedule, a
|
||||
* timeout for a single entity.
|
||||
* For example: connection idleness, where for each connection there
|
||||
* is a CyclicTimeout and a read/write postpones the timeout; when
|
||||
* the timeout expires, the implementation checks against a timestamp
|
||||
* if the connection is really idle.
|
||||
* Another example: HTTP session expiration, where for each HTTP
|
||||
* session there is a CyclicTimeout and at the beginning of the
|
||||
* request processing the timeout is canceled (via cancel()), but at
|
||||
* the end of the request processing the timeout is re-scheduled.</p>
|
||||
* <p>Another typical scenario is for a parent entity to manage
|
||||
* the timeouts of many children entities; the timeout is scheduled
|
||||
* for the child entity that expires the earlier; when the timeout
|
||||
* expires, the implementation scans the children entities to find
|
||||
* the expired child entities and to find the next child entity
|
||||
* that expires the earlier. </p>
|
||||
* <p>This implementation has a {@link Timeout} holding the time
|
||||
* at which the scheduled task should fire, and a linked list of
|
||||
* {@link Wakeup}, each holding the actual scheduled task.</p>
|
||||
|
@ -97,9 +114,12 @@ public abstract class CyclicTimeout implements Destroyable
|
|||
if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup)))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Installed timeout in {} ms, waking up in {} ms",
|
||||
{
|
||||
LOG.debug("Installed timeout in {} ms, {} wake up in {} ms",
|
||||
units.toMillis(delay),
|
||||
newWakeup != null ? "new" : "existing",
|
||||
TimeUnit.NANOSECONDS.toMillis(wakeup._at - now));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,12 +49,14 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
|
|||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Assumptions;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
import org.opentest4j.TestAbortedException;
|
||||
|
||||
import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -494,6 +496,83 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
|
|||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testRequestQueuedDoesNotCancelTimeoutOfQueuedRequests(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
scenario.start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
if (request.getRequestURI().startsWith("/one"))
|
||||
{
|
||||
try
|
||||
{
|
||||
serverLatch.await();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
scenario.client.setMaxConnectionsPerDestination(1);
|
||||
scenario.setMaxRequestsPerConnection(1);
|
||||
|
||||
// Send the first request so that the others get queued.
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.path("/one")
|
||||
.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
latch1.countDown();
|
||||
});
|
||||
|
||||
// Queue a second request, it should expire in the queue.
|
||||
long timeout = 1000;
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.path("/two")
|
||||
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
assertTrue(result.isFailed());
|
||||
assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class));
|
||||
latch2.countDown();
|
||||
});
|
||||
|
||||
Thread.sleep(timeout);
|
||||
|
||||
// Queue a third request, it should not reset the timeout of the second request.
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.path("/three")
|
||||
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
latch3.countDown();
|
||||
});
|
||||
|
||||
// We have already slept a timeout, expect the second request to be back in another timeout.
|
||||
assertTrue(latch2.await(2 * timeout, TimeUnit.MILLISECONDS));
|
||||
|
||||
// Release the first request so the third can be served as well.
|
||||
serverLatch.countDown();
|
||||
|
||||
assertTrue(latch1.await(2 * timeout, TimeUnit.MILLISECONDS));
|
||||
assertTrue(latch3.await(2 * timeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
private void assumeConnectTimeout(String host, int port, int connectTimeout)
|
||||
{
|
||||
try (Socket socket = new Socket())
|
||||
|
|
|
@ -266,6 +266,13 @@ public class TransportScenario
|
|||
setConnectionIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
|
||||
{
|
||||
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
|
||||
if (h2 != null)
|
||||
h2.setMaxConcurrentStreams(maxRequestsPerConnection);
|
||||
}
|
||||
|
||||
public void start(Handler handler) throws Exception
|
||||
{
|
||||
start(handler, null);
|
||||
|
|
Loading…
Reference in New Issue