Various improvements to CyclicTimeouts. (#9897)

* Improved reset of the earliest timeout before iteration.
* Removed check for getExpireNanoTime() == -1, since it's a valid value.
* When onExpired(Expirable) returns false, the Expirable should arrange to move its timeout in the future.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-06-10 17:02:37 +02:00 committed by GitHub
parent d5200c6c63
commit df24485000
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 120 additions and 35 deletions

View File

@ -315,6 +315,11 @@ public abstract class HttpConnection implements IConnection, Attachable
return String.format("%s@%h", getClass().getSimpleName(), this); return String.format("%s@%h", getClass().getSimpleName(), this);
} }
/**
* <p>Enforces the total timeout for requests that have been sent.</p>
* <p>The total timeout for exchanges that are in the destination queue
* is enforced in {@link HttpDestination}.</p>
*/
private class RequestTimeouts extends CyclicTimeouts<HttpChannel> private class RequestTimeouts extends CyclicTimeouts<HttpChannel>
{ {
private RequestTimeouts(Scheduler scheduler) private RequestTimeouts(Scheduler scheduler)
@ -332,11 +337,14 @@ public abstract class HttpConnection implements IConnection, Attachable
protected boolean onExpired(HttpChannel channel) protected boolean onExpired(HttpChannel channel)
{ {
HttpExchange exchange = channel.getHttpExchange(); HttpExchange exchange = channel.getHttpExchange();
if (exchange != null) // The expiration lost the race, as the
{ // exchange may have just been completed.
HttpRequest request = exchange.getRequest(); if (exchange == null)
request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed")); return false;
} HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed"));
// The implementation of the Iterator returned above may not support
// removal, but the HttpChannel will be removed by request.abort().
return false; return false;
} }
} }

View File

@ -550,7 +550,7 @@ public class HttpDestination extends ContainerLifeCycle implements Destination,
} }
/** /**
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p> * <p>Enforces the total timeout for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue * <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpConnection}.</p> * is enforced in {@link HttpConnection}.</p>
*/ */
@ -572,6 +572,8 @@ public class HttpDestination extends ContainerLifeCycle implements Destination,
{ {
HttpRequest request = exchange.getRequest(); HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed")); request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed"));
// The implementation of the Iterator returned above does not support
// removal, but the HttpExchange will be removed by request.abort().
return false; return false;
} }
} }

View File

@ -2385,6 +2385,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
protected boolean onExpired(HTTP2Stream stream) protected boolean onExpired(HTTP2Stream stream)
{ {
stream.onIdleTimeout(new TimeoutException("Idle timeout " + stream.getIdleTimeout() + " ms elapsed")); stream.onIdleTimeout(new TimeoutException("Idle timeout " + stream.getIdleTimeout() + " ms elapsed"));
// The implementation of the Iterator returned above does not support
// removal, but the HTTP2Stream will be removed by stream.onIdleTimeout().
return false; return false;
} }
} }

View File

@ -311,10 +311,9 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
notifyIdleTimeout(this, timeout, Promise.from(timedOut -> notifyIdleTimeout(this, timeout, Promise.from(timedOut ->
{ {
if (timedOut) if (timedOut)
{
// Tell the other peer that we timed out.
reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
} else
notIdle();
}, x -> reset(new ResetFrame(getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP))); }, x -> reset(new ResetFrame(getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP)));
} }

View File

@ -46,7 +46,7 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
private CloseState closeState = CloseState.NOT_CLOSED; private CloseState closeState = CloseState.NOT_CLOSED;
private FrameState frameState = FrameState.INITIAL; private FrameState frameState = FrameState.INITIAL;
private long idleTimeout; private long idleTimeout;
private long expireNanoTime; private long expireNanoTime = Long.MAX_VALUE;
private Object attachment; private Object attachment;
private boolean dataDemand; private boolean dataDemand;
private boolean dataStalled; private boolean dataStalled;
@ -129,6 +129,8 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
{ {
if (timedOut) if (timedOut)
endPoint.close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout); endPoint.close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout);
else
notIdle();
promise.succeeded(timedOut); promise.succeeded(timedOut);
}, promise::failed)); }, promise::failed));
} }

View File

@ -65,6 +65,10 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
* <p>This method may be invoked multiple times, and even concurrently, * <p>This method may be invoked multiple times, and even concurrently,
* for the same expirable entity and therefore the expiration of the * for the same expirable entity and therefore the expiration of the
* entity, if any, should be an idempotent action.</p> * entity, if any, should be an idempotent action.</p>
* <p>When {@code false} is returned, the implementation should adjust
* the {@link Expirable} expiration, so that a call to
* {@link Expirable#getExpireNanoTime()} after this method has returned
* yields a new expiration nanoTime.</p>
* *
* @param expirable the entity that is expired * @param expirable the entity that is expired
* @return whether the entity should be removed from the iterator via {@link Iterator#remove()} * @return whether the entity should be removed from the iterator via {@link Iterator#remove()}
@ -78,11 +82,10 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
long now = NanoTime.now(); long now = NanoTime.now();
long earliest = Long.MAX_VALUE; long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again. // Move the earliest timeout far in the future, so we can expire again.
// A concurrent call to schedule(long) may lose an // A concurrent call to schedule(long) may lose an earliest value, but
// earliest value, but the corresponding entity will // the corresponding entity will be seen during the iteration below.
// be seen during the iteration below. earliestTimeout.set(now + Long.MAX_VALUE);
earliestTimeout.set(earliest);
Iterator<T> iterator = iterator(); Iterator<T> iterator = iterator();
if (iterator == null) if (iterator == null)
@ -98,23 +101,26 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Entity {} expires in {} ms for {}", expirable, NanoTime.millisElapsed(now, expiresAt), this); LOG.debug("Entity {} expires in {} ms for {}", expirable, NanoTime.millisElapsed(now, expiresAt), this);
if (expiresAt == -1)
continue;
if (NanoTime.isBeforeOrSame(expiresAt, now)) if (NanoTime.isBeforeOrSame(expiresAt, now))
{ {
boolean remove = onExpired(expirable); boolean remove = onExpired(expirable);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this); LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this);
if (remove) if (remove)
{
iterator.remove(); iterator.remove();
continue; continue;
}
long newExpiresAt = expirable.getExpireNanoTime();
if (newExpiresAt == expiresAt)
continue;
expiresAt = newExpiresAt;
} }
earliest = Math.min(earliest, NanoTime.elapsed(now, expiresAt)); earliest = Math.min(earliest, NanoTime.elapsed(now, expiresAt));
} }
if (earliest < Long.MAX_VALUE) if (earliest != Long.MAX_VALUE)
schedule(now + earliest); schedule(now + earliest);
} }
@ -126,7 +132,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
public void schedule(T expirable) public void schedule(T expirable)
{ {
long expiresAt = expirable.getExpireNanoTime(); long expiresAt = expirable.getExpireNanoTime();
if (expiresAt < Long.MAX_VALUE) if (expiresAt != Long.MAX_VALUE)
schedule(expiresAt); schedule(expiresAt);
} }

View File

@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CyclicTimeoutsTest public class CyclicTimeoutsTest
{ {
private Scheduler scheduler; private Scheduler scheduler;
private CyclicTimeouts<ConstantExpirable> timeouts; private CyclicTimeouts<CyclicTimeouts.Expirable> timeouts;
@BeforeEach @BeforeEach
public void prepare() public void prepare()
@ -65,14 +65,14 @@ public class CyclicTimeoutsTest
timeouts = new CyclicTimeouts<>(scheduler) timeouts = new CyclicTimeouts<>(scheduler)
{ {
@Override @Override
protected Iterator<ConstantExpirable> iterator() protected Iterator<CyclicTimeouts.Expirable> iterator()
{ {
latch.countDown(); latch.countDown();
return null; return null;
} }
@Override @Override
protected boolean onExpired(ConstantExpirable expirable) protected boolean onExpired(CyclicTimeouts.Expirable expirable)
{ {
return false; return false;
} }
@ -93,14 +93,14 @@ public class CyclicTimeoutsTest
timeouts = new CyclicTimeouts<>(scheduler) timeouts = new CyclicTimeouts<>(scheduler)
{ {
@Override @Override
protected Iterator<ConstantExpirable> iterator() protected Iterator<CyclicTimeouts.Expirable> iterator()
{ {
iteratorLatch.countDown(); iteratorLatch.countDown();
return Collections.emptyIterator(); return Collections.emptyIterator();
} }
@Override @Override
protected boolean onExpired(ConstantExpirable expirable) protected boolean onExpired(CyclicTimeouts.Expirable expirable)
{ {
expiredLatch.countDown(); expiredLatch.countDown();
return false; return false;
@ -118,22 +118,22 @@ public class CyclicTimeoutsTest
public void testIterateAndExpire(boolean remove) throws Exception public void testIterateAndExpire(boolean remove) throws Exception
{ {
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS); ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS); DynamicExpirable one = new DynamicExpirable(NanoTime.now() + TimeUnit.SECONDS.toNanos(1));
Collection<ConstantExpirable> collection = new ArrayList<>(); Collection<CyclicTimeouts.Expirable> collection = new ArrayList<>();
collection.add(one); collection.add(one);
AtomicInteger iterations = new AtomicInteger(); AtomicInteger iterations = new AtomicInteger();
CountDownLatch expiredLatch = new CountDownLatch(1); CountDownLatch expiredLatch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<>(scheduler) timeouts = new CyclicTimeouts<>(scheduler)
{ {
@Override @Override
protected Iterator<ConstantExpirable> iterator() protected Iterator<CyclicTimeouts.Expirable> iterator()
{ {
iterations.incrementAndGet(); iterations.incrementAndGet();
return collection.iterator(); return collection.iterator();
} }
@Override @Override
protected boolean onExpired(ConstantExpirable expirable) protected boolean onExpired(CyclicTimeouts.Expirable expirable)
{ {
assertSame(one, expirable); assertSame(one, expirable);
expiredLatch.countDown(); expiredLatch.countDown();
@ -169,22 +169,22 @@ public class CyclicTimeoutsTest
long delayMs = 2000; long delayMs = 2000;
ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS); ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS); ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
Collection<ConstantExpirable> collection = new ArrayList<>(); Collection<CyclicTimeouts.Expirable> collection = new ArrayList<>();
collection.add(two); collection.add(two);
CountDownLatch expiredLatch = new CountDownLatch(2); CountDownLatch expiredLatch = new CountDownLatch(2);
List<ConstantExpirable> expired = new ArrayList<>(); List<CyclicTimeouts.Expirable> expired = new ArrayList<>();
timeouts = new CyclicTimeouts<>(scheduler) timeouts = new CyclicTimeouts<>(scheduler)
{ {
private final AtomicBoolean overtakeScheduled = new AtomicBoolean(); private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
@Override @Override
protected Iterator<ConstantExpirable> iterator() protected Iterator<CyclicTimeouts.Expirable> iterator()
{ {
return collection.iterator(); return collection.iterator();
} }
@Override @Override
protected boolean onExpired(ConstantExpirable expirable) protected boolean onExpired(CyclicTimeouts.Expirable expirable)
{ {
expired.add(expirable); expired.add(expirable);
expiredLatch.countDown(); expiredLatch.countDown();
@ -220,6 +220,39 @@ public class CyclicTimeoutsTest
assertSame(two, expired.get(1)); assertSame(two, expired.get(1));
} }
@Test
public void testDynamicExpirableEntityIsNotifiedMultipleTimes() throws Exception
{
long delay = 500;
DynamicExpirable entity = new DynamicExpirable(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(delay));
List<CyclicTimeouts.Expirable> entities = List.of(entity);
CountDownLatch latch = new CountDownLatch(2);
timeouts = new CyclicTimeouts<>(scheduler)
{
@Override
protected Iterator<CyclicTimeouts.Expirable> iterator()
{
return entities.iterator();
}
@Override
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
{
assertSame(entity, expirable);
// Postpone expiration.
entity.expireNanoTime = NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(delay);
latch.countDown();
return false;
}
};
// Trigger the initial call to iterator().
timeouts.schedule(entities.get(0));
assertTrue(latch.await(3 * delay, TimeUnit.MILLISECONDS), latch.toString());
}
private static class ConstantExpirable implements CyclicTimeouts.Expirable private static class ConstantExpirable implements CyclicTimeouts.Expirable
{ {
private static ConstantExpirable noExpire() private static ConstantExpirable noExpire()
@ -259,4 +292,26 @@ public class CyclicTimeoutsTest
return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString); return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
} }
} }
private static class DynamicExpirable implements CyclicTimeouts.Expirable
{
private long expireNanoTime;
public DynamicExpirable(long expireNanoTime)
{
this.expireNanoTime = expireNanoTime;
}
@Override
public long getExpireNanoTime()
{
return expireNanoTime;
}
@Override
public String toString()
{
return String.format("%s@%x[%dms]", getClass().getSimpleName(), hashCode(), NanoTime.millisUntil(expireNanoTime));
}
}
} }

View File

@ -133,6 +133,8 @@ public class ServerQuicConnection extends QuicConnection
protected boolean onExpired(ServerQuicSession session) protected boolean onExpired(ServerQuicSession session)
{ {
session.onIdleTimeout(); session.onIdleTimeout();
// The implementation of the Iterator returned above does not support
// removal, but the session will be removed by session.onIdleTimeout().
return false; return false;
} }
} }

View File

@ -44,7 +44,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable
{ {
private final Connector connector; private final Connector connector;
private long expireNanoTime; private long expireNanoTime = Long.MAX_VALUE;
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector) protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
{ {
@ -103,6 +103,15 @@ public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Exp
getQuicConnection().schedule(this); getQuicConnection().schedule(this);
} }
@Override
public boolean onIdleTimeout()
{
boolean result = super.onIdleTimeout();
if (!result)
notIdle();
return result;
}
@Override @Override
public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{ {