Fixes #10120 - OOME caused by CyclicTimeouts.
Fixed handling of Expirable.getExpireNanoTime() in case it returns Long.MAX_VALUE. Also fixed implementations of Expirable that were not initializing their expireNanoTime field to Long.MAX_VALUE. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
9e16d81cf8
commit
e7a088f3f0
|
@ -42,7 +42,7 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
|
||||||
private CloseState closeState = CloseState.NOT_CLOSED;
|
private CloseState closeState = CloseState.NOT_CLOSED;
|
||||||
private FrameState frameState = FrameState.INITIAL;
|
private FrameState frameState = FrameState.INITIAL;
|
||||||
private long idleTimeout;
|
private long idleTimeout;
|
||||||
private long expireNanoTime;
|
private long expireNanoTime = Long.MAX_VALUE;
|
||||||
private Object attachment;
|
private Object attachment;
|
||||||
|
|
||||||
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
|
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
|
||||||
|
|
|
@ -47,7 +47,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
||||||
{
|
{
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(CyclicTimeouts.class);
|
private static final Logger LOG = LoggerFactory.getLogger(CyclicTimeouts.class);
|
||||||
|
|
||||||
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);
|
private final AtomicLong earliestNanoTime = new AtomicLong(Long.MAX_VALUE);
|
||||||
private final CyclicTimeout cyclicTimeout;
|
private final CyclicTimeout cyclicTimeout;
|
||||||
|
|
||||||
public CyclicTimeouts(Scheduler scheduler)
|
public CyclicTimeouts(Scheduler scheduler)
|
||||||
|
@ -82,7 +82,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
||||||
// A concurrent call to schedule(long) may lose an
|
// A concurrent call to schedule(long) may lose an
|
||||||
// earliest value, but the corresponding entity will
|
// earliest value, but the corresponding entity will
|
||||||
// be seen during the iteration below.
|
// be seen during the iteration below.
|
||||||
earliestTimeout.set(earliest);
|
earliestNanoTime.set(earliest);
|
||||||
|
|
||||||
Iterator<T> iterator = iterator();
|
Iterator<T> iterator = iterator();
|
||||||
if (iterator == null)
|
if (iterator == null)
|
||||||
|
@ -95,12 +95,16 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
||||||
T expirable = iterator.next();
|
T expirable = iterator.next();
|
||||||
long expiresAt = expirable.getExpireNanoTime();
|
long expiresAt = expirable.getExpireNanoTime();
|
||||||
|
|
||||||
|
if (expiresAt == Long.MAX_VALUE)
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Entity {} does not expire for {}", expirable, this);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Entity {} expires in {} ms for {}", expirable, NanoTime.millisElapsed(now, expiresAt), this);
|
LOG.debug("Entity {} expires in {} ms for {}", expirable, NanoTime.millisElapsed(now, expiresAt), this);
|
||||||
|
|
||||||
if (expiresAt == -1)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (NanoTime.isBeforeOrSame(expiresAt, now))
|
if (NanoTime.isBeforeOrSame(expiresAt, now))
|
||||||
{
|
{
|
||||||
boolean remove = onExpired(expirable);
|
boolean remove = onExpired(expirable);
|
||||||
|
@ -135,7 +139,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
||||||
// Schedule a timeout for the earliest entity that may expire.
|
// Schedule a timeout for the earliest entity that may expire.
|
||||||
// When the timeout expires, scan the entities for the next
|
// When the timeout expires, scan the entities for the next
|
||||||
// earliest entity that may expire, and reschedule a new timeout.
|
// earliest entity that may expire, and reschedule a new timeout.
|
||||||
long prevEarliest = earliestTimeout.getAndUpdate(t -> NanoTime.isBefore(t, expiresAt) ? t : expiresAt);
|
long prevEarliest = earliestNanoTime.getAndUpdate(t -> NanoTime.isBefore(t, expiresAt) ? t : expiresAt);
|
||||||
long expires = expiresAt;
|
long expires = expiresAt;
|
||||||
while (NanoTime.isBefore(expires, prevEarliest))
|
while (NanoTime.isBefore(expires, prevEarliest))
|
||||||
{
|
{
|
||||||
|
@ -148,7 +152,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
||||||
// If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
|
// If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
|
||||||
// is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
|
// is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
|
||||||
prevEarliest = expires;
|
prevEarliest = expires;
|
||||||
expires = earliestTimeout.get();
|
expires = earliestNanoTime.get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
public class CyclicTimeoutsTest
|
public class CyclicTimeoutsTest
|
||||||
{
|
{
|
||||||
private Scheduler scheduler;
|
private Scheduler scheduler;
|
||||||
private CyclicTimeouts<ConstantExpirable> timeouts;
|
private CyclicTimeouts<CyclicTimeouts.Expirable> timeouts;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void prepare()
|
public void prepare()
|
||||||
|
@ -65,14 +65,14 @@ public class CyclicTimeoutsTest
|
||||||
timeouts = new CyclicTimeouts<>(scheduler)
|
timeouts = new CyclicTimeouts<>(scheduler)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected Iterator<ConstantExpirable> iterator()
|
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||||
{
|
{
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean onExpired(ConstantExpirable expirable)
|
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -84,6 +84,47 @@ public class CyclicTimeoutsTest
|
||||||
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpirableEntityBecomesNonExpirable() throws Exception
|
||||||
|
{
|
||||||
|
long timeout = 1000;
|
||||||
|
DynamicExpirable entity = new DynamicExpirable(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(timeout));
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
timeouts = new CyclicTimeouts<>(scheduler)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||||
|
{
|
||||||
|
entity.expireNanoTime = Long.MAX_VALUE;
|
||||||
|
return List.<Expirable>of(entity).iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
|
||||||
|
{
|
||||||
|
if (unit.toMillis(delay) > 2 * timeout)
|
||||||
|
latch.countDown();
|
||||||
|
return super.schedule(cyclicTimeout, delay, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
timeouts.schedule(entity);
|
||||||
|
|
||||||
|
// Wait until the timeouts check.
|
||||||
|
Thread.sleep(timeout);
|
||||||
|
|
||||||
|
// Since the expireNanoTime was changed to Long.MAX_VALUE,
|
||||||
|
// the entity must not have been scheduled nor expired.
|
||||||
|
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScheduleZero() throws Exception
|
public void testScheduleZero() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -93,14 +134,14 @@ public class CyclicTimeoutsTest
|
||||||
timeouts = new CyclicTimeouts<>(scheduler)
|
timeouts = new CyclicTimeouts<>(scheduler)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected Iterator<ConstantExpirable> iterator()
|
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||||
{
|
{
|
||||||
iteratorLatch.countDown();
|
iteratorLatch.countDown();
|
||||||
return Collections.emptyIterator();
|
return Collections.emptyIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean onExpired(ConstantExpirable expirable)
|
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||||
{
|
{
|
||||||
expiredLatch.countDown();
|
expiredLatch.countDown();
|
||||||
return false;
|
return false;
|
||||||
|
@ -119,21 +160,21 @@ public class CyclicTimeoutsTest
|
||||||
{
|
{
|
||||||
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
|
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
|
||||||
ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS);
|
ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS);
|
||||||
Collection<ConstantExpirable> collection = new ArrayList<>();
|
Collection<CyclicTimeouts.Expirable> collection = new ArrayList<>();
|
||||||
collection.add(one);
|
collection.add(one);
|
||||||
AtomicInteger iterations = new AtomicInteger();
|
AtomicInteger iterations = new AtomicInteger();
|
||||||
CountDownLatch expiredLatch = new CountDownLatch(1);
|
CountDownLatch expiredLatch = new CountDownLatch(1);
|
||||||
timeouts = new CyclicTimeouts<>(scheduler)
|
timeouts = new CyclicTimeouts<>(scheduler)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected Iterator<ConstantExpirable> iterator()
|
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||||
{
|
{
|
||||||
iterations.incrementAndGet();
|
iterations.incrementAndGet();
|
||||||
return collection.iterator();
|
return collection.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean onExpired(ConstantExpirable expirable)
|
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||||
{
|
{
|
||||||
assertSame(one, expirable);
|
assertSame(one, expirable);
|
||||||
expiredLatch.countDown();
|
expiredLatch.countDown();
|
||||||
|
@ -169,22 +210,22 @@ public class CyclicTimeoutsTest
|
||||||
long delayMs = 2000;
|
long delayMs = 2000;
|
||||||
ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
|
ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
|
||||||
ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
|
ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
|
||||||
Collection<ConstantExpirable> collection = new ArrayList<>();
|
Collection<CyclicTimeouts.Expirable> collection = new ArrayList<>();
|
||||||
collection.add(two);
|
collection.add(two);
|
||||||
CountDownLatch expiredLatch = new CountDownLatch(2);
|
CountDownLatch expiredLatch = new CountDownLatch(2);
|
||||||
List<ConstantExpirable> expired = new ArrayList<>();
|
List<CyclicTimeouts.Expirable> expired = new ArrayList<>();
|
||||||
timeouts = new CyclicTimeouts<>(scheduler)
|
timeouts = new CyclicTimeouts<>(scheduler)
|
||||||
{
|
{
|
||||||
private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
|
private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Iterator<ConstantExpirable> iterator()
|
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||||
{
|
{
|
||||||
return collection.iterator();
|
return collection.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean onExpired(ConstantExpirable expirable)
|
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||||
{
|
{
|
||||||
expired.add(expirable);
|
expired.add(expirable);
|
||||||
expiredLatch.countDown();
|
expiredLatch.countDown();
|
||||||
|
@ -259,4 +300,26 @@ public class CyclicTimeoutsTest
|
||||||
return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
|
return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class DynamicExpirable implements CyclicTimeouts.Expirable
|
||||||
|
{
|
||||||
|
private long expireNanoTime;
|
||||||
|
|
||||||
|
public DynamicExpirable(long expireNanoTime)
|
||||||
|
{
|
||||||
|
this.expireNanoTime = expireNanoTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getExpireNanoTime()
|
||||||
|
{
|
||||||
|
return expireNanoTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return String.format("%s@%x[%dms]", getClass().getSimpleName(), hashCode(), NanoTime.millisUntil(expireNanoTime));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# Jetty Logging using jetty-slf4j-impl
|
|
||||||
#org.eclipse.jetty.LEVEL=DEBUG
|
#org.eclipse.jetty.LEVEL=DEBUG
|
||||||
#org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
|
#org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
|
||||||
#org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG
|
#org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG
|
||||||
|
|
|
@ -44,7 +44,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable
|
public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable
|
||||||
{
|
{
|
||||||
private final Connector connector;
|
private final Connector connector;
|
||||||
private long expireNanoTime;
|
private long expireNanoTime = Long.MAX_VALUE;
|
||||||
|
|
||||||
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
|
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue