Fixes #6323 - HttpClient requests with redirects gets stuck/never cal… (#6334)

Fixes #6323 - HttpClient requests with redirects gets stuck/never calls onComplete()

* Reworked the total timeout handling.
* Now a CyclicTimeouts handles the exchanges in each HttpDestination,
and a CyclicTimeouts handles the exchanges in each HttpConnection
(rather than in HttpChannel).
* Now adjusting the total timeout for copied requests generated by
redirects and authentication.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-06-03 15:20:42 +02:00 committed by GitHub
parent f902d12fe8
commit 2e7d17400f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 783 additions and 89 deletions

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -217,8 +218,25 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
// Disable the timeout so that only the one from the initial request applies.
newRequest.timeout(0, TimeUnit.MILLISECONDS);
// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = request.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed");
forwardFailureComplete(request, failure, response, failure);
return;
}
}
if (path != null)
newRequest.path(path);

View File

@ -19,26 +19,24 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class HttpChannel
public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);
private final HttpDestination _destination;
private final TimeoutCompleteListener _totalTimeout;
private HttpExchange _exchange;
protected HttpChannel(HttpDestination destination)
{
_destination = destination;
_totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}
public void destroy()
{
_totalTimeout.destroy();
}
public HttpDestination getHttpDestination()
@ -109,6 +107,13 @@ public abstract class HttpChannel
}
}
@Override
public long getExpireNanoTime()
{
HttpExchange exchange = getHttpExchange();
return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE;
}
protected abstract HttpSender getHttpSender();
protected abstract HttpReceiver getHttpReceiver();
@ -117,16 +122,7 @@ public abstract class HttpChannel
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
long timeoutAt = request.getTimeoutAt();
if (timeoutAt != -1)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(request, timeoutAt);
}
send(exchange);
}
}
public abstract void send(HttpExchange exchange);

View File

@ -22,6 +22,7 @@ import java.net.CookieStore;
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -35,16 +36,19 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public abstract class HttpConnection implements Connection, Attachable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private final HttpDestination destination;
private final RequestTimeouts requestTimeouts;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;
@ -52,6 +56,7 @@ public abstract class HttpConnection implements Connection, Attachable
protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler());
this.idleTimeoutStamp = System.nanoTime();
}
@ -65,6 +70,8 @@ public abstract class HttpConnection implements Connection, Attachable
return destination;
}
protected abstract Iterator<HttpChannel> getHttpChannels();
@Override
public void send(Request request, Response.CompleteListener listener)
{
@ -230,6 +237,7 @@ public abstract class HttpConnection implements Connection, Attachable
SendFailure result;
if (channel.associate(exchange))
{
requestTimeouts.schedule(channel);
channel.send();
result = null;
}
@ -292,9 +300,40 @@ public abstract class HttpConnection implements Connection, Attachable
return attachment;
}
protected void destroy()
{
requestTimeouts.destroy();
}
@Override
public String toString()
{
return String.format("%s@%h", getClass().getSimpleName(), this);
}
private class RequestTimeouts extends CyclicTimeouts<HttpChannel>
{
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
protected Iterator<HttpChannel> iterator()
{
return getHttpChannels();
}
@Override
protected boolean onExpired(HttpChannel channel)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}
return false;
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.AttributesMap;
import org.eclipse.jetty.util.log.Log;
@ -143,6 +144,20 @@ public class HttpConversation extends AttributesMap
this.listeners = listeners;
}
/**
* <p>Returns the total timeout for the conversation.</p>
* <p>The conversation total timeout is the total timeout
* of the first request in the conversation.</p>
*
* @return the total timeout of the conversation
* @see Request#getTimeout()
*/
public long getTimeout()
{
HttpExchange firstExchange = exchanges.peekFirst();
return firstExchange == null ? 0 : firstExchange.getRequest().getTimeout();
}
public boolean abort(Throwable cause)
{
HttpExchange exchange = exchanges.peekLast();

View File

@ -22,12 +22,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
@ -36,7 +35,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
@ -55,7 +54,7 @@ import org.eclipse.jetty.util.thread.Sweeper;
@ManagedObject
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
{
protected static final Logger LOG = Log.getLogger(HttpDestination.class);
private static final Logger LOG = Log.getLogger(HttpDestination.class);
private final HttpClient client;
private final Origin origin;
@ -270,10 +269,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{
if (enqueue(exchanges, exchange))
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
requestTimeouts.schedule(expiresAt);
requestTimeouts.schedule(exchange);
if (!client.isRunning() && exchanges.remove(exchange))
{
request.abort(new RejectedExecutionException(client + " is stopping"));
@ -549,63 +545,27 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
/**
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
* is enforced in {@link HttpConnection}.</p>
*/
private class RequestTimeouts extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
public void onTimeoutExpired()
protected Iterator<HttpExchange> iterator()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);
long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(earliest);
// Scan the message queue to abort expired exchanges
// and to find the exchange that expire the earliest.
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
long expiresAt = request.getTimeoutAt();
if (expiresAt == -1)
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < earliest)
earliest = expiresAt;
}
if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
return exchanges.iterator();
}
private void schedule(long expiresAt)
@Override
protected boolean onExpired(HttpExchange exchange)
{
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
{
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
return false;
}
}
}

View File

@ -22,10 +22,11 @@ import java.util.List;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpExchange
public class HttpExchange implements CyclicTimeouts.Expirable
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);
@ -86,6 +87,12 @@ public class HttpExchange
}
}
@Override
public long getExpireNanoTime()
{
return request.getTimeoutAt();
}
/**
* <p>Associates the given {@code channel} to this exchange.</p>
* <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>

View File

@ -474,9 +474,9 @@ public abstract class HttpReceiver
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -111,8 +112,8 @@ public class HttpRedirector
*/
public Result redirect(Request request, Response response) throws InterruptedException, ExecutionException
{
final AtomicReference<Result> resultRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Result> resultRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Request redirect = redirect(request, response, new BufferingResponseListener()
{
@Override
@ -307,13 +308,29 @@ public class HttpRedirector
}
}
private Request sendRedirect(final HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
private Request sendRedirect(HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
{
try
{
Request redirect = client.copyRequest(httpRequest, location);
// Disable the timeout so that only the one from the initial request applies.
redirect.timeout(0, TimeUnit.MILLISECONDS);
// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = httpRequest.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
redirect.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + httpRequest.getConversation().getTimeout() + " ms elapsed");
fail(httpRequest, failure, response);
return null;
}
}
// Use given method
redirect.method(method);
@ -330,17 +347,27 @@ public class HttpRedirector
}
catch (Throwable x)
{
fail(httpRequest, response, x);
fail(httpRequest, x, response);
return null;
}
}
protected void fail(Request request, Response response, Throwable failure)
{
fail(request, null, response, failure);
}
protected void fail(Request request, Throwable failure, Response response)
{
fail(request, failure, response, failure);
}
private void fail(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> listeners = conversation.getResponseListeners();
notifier.notifyFailure(listeners, response, failure);
notifier.notifyComplete(listeners, new Result(request, response, failure));
notifier.notifyFailure(listeners, response, responseFailure);
notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
}

View File

@ -80,7 +80,7 @@ public class HttpRequest implements Request
private HttpVersion version = HttpVersion.HTTP_1_1;
private long idleTimeout = -1;
private long timeout;
private long timeoutAt;
private long timeoutAt = Long.MAX_VALUE;
private ContentProvider content;
private boolean followRedirects;
private List<HttpCookie> cookies;
@ -781,11 +781,12 @@ public class HttpRequest implements Request
void sent()
{
long timeout = getTimeout();
timeoutAt = timeout > 0 ? System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) : -1;
if (timeout > 0)
timeoutAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
}
/**
* @return The nanoTime at which the timeout expires or -1 if there is no timeout.
* @return The nanoTime at which the timeout expires or {@link Long#MAX_VALUE} if there is no timeout.
* @see #timeout(long, TimeUnit)
*/
long getTimeoutAt()

View File

@ -26,10 +26,15 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* @deprecated Do not use it, use {@link CyclicTimeouts} instead.
*/
@Deprecated
public class TimeoutCompleteListener extends CyclicTimeout implements Response.CompleteListener
{
private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class);

View File

@ -20,11 +20,14 @@ package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
@ -243,6 +246,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
super(destination);
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return Collections.<HttpChannel>singleton(channel).iterator();
}
@Override
protected SendFailure send(HttpExchange exchange)
{
@ -264,6 +273,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
public void close()
{
HttpConnectionOverHTTP.this.close();
destroy();
}
@Override

View File

@ -521,6 +521,117 @@ public class HttpClientRedirectTest extends AbstractHttpClientServerTest
});
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testRedirectToDifferentHostThenRequestToFirstHostExpires(Scenario scenario) throws Exception
{
long timeout = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
if ("/one".equals(target))
{
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader(HttpHeader.LOCATION.asString(), scenario.getScheme() + "://127.0.0.1:" + connector.getLocalPort() + "/two");
}
else if ("/two".equals(target))
{
try
{
// Send another request to "localhost", therefore reusing the
// connection used for the first request, it must timeout.
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/three")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
if (result.getFailure() instanceof TimeoutException)
latch.countDown();
});
// Wait for the request to fail as it should.
assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
}
catch (Throwable x)
{
throw new ServletException(x);
}
}
else if ("/three".equals(target))
{
try
{
// The third request must timeout.
Thread.sleep(2 * timeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/one")
// The timeout should not expire, but must be present to trigger the test conditions.
.timeout(3 * timeout, TimeUnit.MILLISECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testManyRedirectsTotalTimeoutExpires(Scenario scenario) throws Exception
{
long timeout = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
try
{
String serverURI = scenario.getScheme() + "://localhost:" + connector.getLocalPort();
if ("/one".equals(target))
{
Thread.sleep(timeout);
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/two");
}
else if ("/two".equals(target))
{
Thread.sleep(timeout);
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/three");
}
else if ("/three".equals(target))
{
Thread.sleep(2 * timeout);
}
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
assertThrows(TimeoutException.class, () ->
{
client.setMaxRedirects(-1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/one")
.timeout(3 * timeout, TimeUnit.MILLISECONDS)
.send();
});
}
private void testSameMethodRedirect(final Scenario scenario, final HttpMethod method, int redirectCode) throws Exception
{
testMethodRedirect(scenario, method, method, redirectCode);
@ -569,7 +680,7 @@ public class HttpClientRedirectTest extends AbstractHttpClientServerTest
assertEquals(200, response.getStatus());
}
private class RedirectHandler extends EmptyServerHandler
private static class RedirectHandler extends EmptyServerHandler
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.fcgi.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
@ -373,6 +374,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
super(destination);
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return new IteratorWrapper<>(activeChannels.values().iterator());
}
@Override
protected SendFailure send(HttpExchange exchange)
{
@ -391,6 +398,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
public void close()
{
HttpConnectionOverFCGI.this.close();
destroy();
}
protected void close(Throwable failure)
@ -511,4 +519,32 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
LOG.debug("Channel not found for request {}", request);
}
}
private static final class IteratorWrapper<T> implements Iterator<T>
{
private final Iterator<? extends T> iterator;
private IteratorWrapper(Iterator<? extends T> iterator)
{
this.iterator = iterator;
}
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public T next()
{
return iterator.next();
}
@Override
public void remove()
{
iterator.remove();
}
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -73,6 +74,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
this.recycleHttpChannels = recycleHttpChannels;
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return activeChannels.iterator();
}
@Override
protected SendFailure send(HttpExchange exchange)
{
@ -133,6 +140,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
public void close()
{
close(new AsynchronousCloseException());
destroy();
}
protected void close(Throwable failure)

View File

@ -44,12 +44,6 @@ import static java.lang.Long.MAX_VALUE;
* session there is a CyclicTimeout and at the beginning of the
* request processing the timeout is canceled (via cancel()), but at
* the end of the request processing the timeout is re-scheduled.</p>
* <p>Another typical scenario is for a parent entity to manage
* the timeouts of many children entities; the timeout is scheduled
* for the child entity that expires the earlier; when the timeout
* expires, the implementation scans the children entities to find
* the expired child entities and to find the next child entity
* that expires the earlier. </p>
* <p>This implementation has a {@link Timeout} holding the time
* at which the scheduled task should fire, and a linked list of
* {@link Wakeup}, each holding the actual scheduled task.</p>
@ -64,6 +58,8 @@ import static java.lang.Long.MAX_VALUE;
* When the Wakeup task fires, it will see that the Timeout is now
* in the future and will attach a new Wakeup with the future time
* to the Timeout, and submit a scheduler task for the new Wakeup.</p>
*
* @see CyclicTimeouts
*/
public abstract class CyclicTimeout implements Destroyable
{

View File

@ -0,0 +1,199 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>An implementation of a timeout that manages many {@link Expirable expirable} entities whose
* timeouts are mostly cancelled or re-scheduled.</p>
* <p>A typical scenario is for a parent entity to manage the timeouts of many children entities.</p>
* <p>When a new entity is created, call {@link #schedule(Expirable)} with the new entity so that
* this instance can be aware and manage the timeout of the new entity.</p>
* <p>Eventually, this instance wakes up and iterates over the entities provided by {@link #iterator()}.
* During the iteration, each entity:</p>
* <ul>
* <li>may never expire (see {@link Expirable#getExpireNanoTime()}; the entity is ignored</li>
* <li>may be expired; {@link #onExpired(Expirable)} is called with that entity as parameter</li>
* <li>may expire at a future time; the iteration records the earliest expiration time among
* all non-expired entities</li>
* </ul>
* <p>When the iteration is complete, this instance is re-scheduled with the earliest expiration time
* calculated during the iteration.</p>
*
* @param <T> the {@link Expirable} entity type
* @see CyclicTimeout
*/
public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> implements Destroyable
{
private static final Logger LOG = Log.getLogger(CyclicTimeouts.class);
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);
private final CyclicTimeout cyclicTimeout;
public CyclicTimeouts(Scheduler scheduler)
{
cyclicTimeout = new Timeouts(scheduler);
}
/**
* @return the entities to iterate over when this instance expires
*/
protected abstract Iterator<T> iterator();
/**
* <p>Invoked during the iteration when the given entity is expired.</p>
* <p>This method may be invoked multiple times, and even concurrently,
* for the same expirable entity and therefore the expiration of the
* entity, if any, should be an idempotent action.</p>
*
* @param expirable the entity that is expired
* @return whether the entity should be removed from the iterator via {@link Iterator#remove()}
*/
protected abstract boolean onExpired(T expirable);
private void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("Timeouts check for {}", this);
long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an
// earliest value, but the corresponding entity will
// be seen during the iteration below.
earliestTimeout.set(earliest);
Iterator<T> iterator = iterator();
if (iterator == null)
return;
// Scan the entities to abort expired entities
// and to find the entity that expires the earliest.
while (iterator.hasNext())
{
T expirable = iterator.next();
long expiresAt = expirable.getExpireNanoTime();
if (LOG.isDebugEnabled())
LOG.debug("Entity {} expires in {} ms for {}", expirable, TimeUnit.NANOSECONDS.toMillis(expiresAt - now), this);
if (expiresAt == -1)
continue;
if (expiresAt <= now)
{
boolean remove = onExpired(expirable);
if (LOG.isDebugEnabled())
LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this);
if (remove)
iterator.remove();
continue;
}
earliest = Math.min(earliest, expiresAt);
}
if (earliest < Long.MAX_VALUE)
schedule(earliest);
}
/**
* <p>Manages the timeout of a new entity.</p>
*
* @param expirable the new entity to manage the timeout for
*/
public void schedule(T expirable)
{
long expiresAt = expirable.getExpireNanoTime();
if (expiresAt < Long.MAX_VALUE)
schedule(expiresAt);
}
private void schedule(long expiresAt)
{
// Schedule a timeout for the earliest entity that may expire.
// When the timeout expires, scan the entities for the next
// earliest entity that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
long expires = expiresAt;
while (expires < prevEarliest)
{
// A new entity expires earlier than previous entities, schedule it.
long delay = Math.max(0, expires - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Scheduling timeout in {} ms for {}", TimeUnit.NANOSECONDS.toMillis(delay), this);
schedule(cyclicTimeout, delay, TimeUnit.NANOSECONDS);
// If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
// is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
prevEarliest = expires;
expires = earliestTimeout.get();
}
}
@Override
public void destroy()
{
cyclicTimeout.destroy();
}
boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
{
return cyclicTimeout.schedule(delay, unit);
}
/**
* <p>An entity that may expire.</p>
*/
public interface Expirable
{
/**
* <p>Returns the expiration time in nanoseconds.</p>
* <p>The value to return must be calculated taking into account {@link System#nanoTime()},
* for example:</p>
* {@code expireNanoTime = System.nanoTime() + timeoutNanos}
* <p>Returning {@link Long#MAX_VALUE} indicates that this entity does not expire.</p>
*
* @return the expiration time in nanoseconds, or {@link Long#MAX_VALUE} if this entity does not expire
*/
public long getExpireNanoTime();
}
private class Timeouts extends CyclicTimeout
{
private Timeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
public void onTimeoutExpired()
{
CyclicTimeouts.this.onTimeoutExpired();
}
}
}

View File

@ -0,0 +1,266 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CyclicTimeoutsTest
{
private Scheduler scheduler;
private CyclicTimeouts<ConstantExpirable> timeouts;
@BeforeEach
public void prepare()
{
scheduler = new ScheduledExecutorScheduler();
LifeCycle.start(scheduler);
}
@AfterEach
public void dispose()
{
if (timeouts != null)
timeouts.destroy();
LifeCycle.stop(scheduler);
}
@Test
public void testNoExpirationForNonExpiringEntity() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<ConstantExpirable>(scheduler)
{
@Override
protected Iterator<ConstantExpirable> iterator()
{
latch.countDown();
return null;
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
return false;
}
};
// Schedule an entity that does not expire.
timeouts.schedule(ConstantExpirable.noExpire());
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
}
@Test
public void testScheduleZero() throws Exception
{
ConstantExpirable entity = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
CountDownLatch iteratorLatch = new CountDownLatch(1);
CountDownLatch expiredLatch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<ConstantExpirable>(scheduler)
{
@Override
protected Iterator<ConstantExpirable> iterator()
{
iteratorLatch.countDown();
return Collections.emptyIterator();
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
expiredLatch.countDown();
return false;
}
};
timeouts.schedule(entity);
Assertions.assertTrue(iteratorLatch.await(1, TimeUnit.SECONDS));
Assertions.assertFalse(expiredLatch.await(1, TimeUnit.SECONDS));
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testIterateAndExpire(boolean remove) throws Exception
{
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS);
Collection<ConstantExpirable> collection = new ArrayList<>();
collection.add(one);
AtomicInteger iterations = new AtomicInteger();
CountDownLatch expiredLatch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<ConstantExpirable>(scheduler)
{
@Override
protected Iterator<ConstantExpirable> iterator()
{
iterations.incrementAndGet();
return collection.iterator();
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
assertSame(one, expirable);
expiredLatch.countDown();
return remove;
}
};
// Triggers immediate call to iterator(), which
// returns an entity that expires in 1 second.
timeouts.schedule(zero);
// After 1 second there is a second call to
// iterator(), which returns the now expired
// entity, which is passed to onExpired().
assertTrue(expiredLatch.await(2, TimeUnit.SECONDS));
// Wait for the collection to be processed
// with the return value of onExpired().
Thread.sleep(1000);
// Verify the processing of the return value of onExpired().
assertEquals(remove ? 0 : 1, collection.size());
// Wait to see if iterator() is called again (it should not).
Thread.sleep(1000);
assertEquals(2, iterations.get());
}
@Test
public void testScheduleOvertake() throws Exception
{
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
long delayMs = 2000;
ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
Collection<ConstantExpirable> collection = new ArrayList<>();
collection.add(two);
CountDownLatch expiredLatch = new CountDownLatch(2);
List<ConstantExpirable> expired = new ArrayList<>();
timeouts = new CyclicTimeouts<ConstantExpirable>(scheduler)
{
private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
@Override
protected Iterator<ConstantExpirable> iterator()
{
return collection.iterator();
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
expired.add(expirable);
expiredLatch.countDown();
return true;
}
@Override
boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
{
if (delay <= 0)
return super.schedule(cyclicTimeout, delay, unit);
// Simulate that an entity with a shorter timeout
// overtakes the entity that is currently being scheduled.
// Only schedule the overtake once.
if (overtakeScheduled.compareAndSet(false, true))
{
collection.add(overtake);
schedule(overtake);
}
return super.schedule(cyclicTimeout, delay, unit);
}
};
// Trigger the initial call to iterator().
timeouts.schedule(zero);
// Make sure that the overtake entity expires properly.
assertTrue(expiredLatch.await(2 * delayMs, TimeUnit.MILLISECONDS));
// Make sure all entities expired properly.
assertSame(overtake, expired.get(0));
assertSame(two, expired.get(1));
}
private static class ConstantExpirable implements CyclicTimeouts.Expirable
{
private static ConstantExpirable noExpire()
{
return new ConstantExpirable();
}
private static ConstantExpirable ofDelay(long delay, TimeUnit unit)
{
return new ConstantExpirable(delay, unit);
}
private final long expireNanos;
private final String asString;
private ConstantExpirable()
{
this.expireNanos = Long.MAX_VALUE;
this.asString = "noexp";
}
public ConstantExpirable(long delay, TimeUnit unit)
{
this.expireNanos = System.nanoTime() + unit.toNanos(delay);
this.asString = String.valueOf(unit.toMillis(delay));
}
@Override
public long getExpireNanoTime()
{
return expireNanos;
}
@Override
public String toString()
{
return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
}
}
}