Fix `DynamicCapacity.release()` (#12083)

#12082 Add assertions to DynamicCapacity to avoid entering an undetected corrupt state after it is released.

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2024-08-29 15:53:08 +02:00 committed by GitHub
parent b106c935de
commit 85d7d07a24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 50 additions and 23 deletions

View File

@ -54,7 +54,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
private final Collection<HTTP2Session.Entry> processedEntries = new ArrayList<>(); private final Collection<HTTP2Session.Entry> processedEntries = new ArrayList<>();
private final HTTP2Session session; private final HTTP2Session session;
private final RetainableByteBuffer.Mutable accumulator; private final RetainableByteBuffer.Mutable accumulator;
private boolean released;
private InvocationType invocationType = InvocationType.NON_BLOCKING; private InvocationType invocationType = InvocationType.NON_BLOCKING;
private Throwable terminated; private Throwable terminated;
private HTTP2Session.Entry stalledEntry; private HTTP2Session.Entry stalledEntry;
@ -334,7 +333,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
private void finish() private void finish()
{ {
release(); accumulator.clear();
processedEntries.forEach(HTTP2Session.Entry::succeeded); processedEntries.forEach(HTTP2Session.Entry::succeeded);
processedEntries.clear(); processedEntries.clear();
invocationType = InvocationType.NON_BLOCKING; invocationType = InvocationType.NON_BLOCKING;
@ -354,15 +353,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
} }
} }
private void release()
{
if (!released)
{
released = true;
accumulator.release();
}
}
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
@ -382,12 +372,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
@Override @Override
protected void onCompleteFailure(Throwable x) protected void onCompleteFailure(Throwable x)
{ {
release(); accumulator.release();
} }
private void onSessionFailure(Throwable x) private void onSessionFailure(Throwable x)
{ {
release(); accumulator.clear();
Throwable closed = fail(x); Throwable closed = fail(x);
if (closed == null) if (closed == null)
session.close(ErrorCode.COMPRESSION_ERROR.code, null, NOOP); session.close(ErrorCode.COMPRESSION_ERROR.code, null, NOOP);

View File

@ -122,7 +122,7 @@ public class BadURITest
Thread.sleep(1000); Thread.sleep(1000);
// Send a second request and verify that it hits the Handler. // Send a second request and verify that it hits the Handler.
accumulator.release(); accumulator.clear();
MetaData.Request metaData2 = new MetaData.Request( MetaData.Request metaData2 = new MetaData.Request(
HttpMethod.GET.asString(), HttpMethod.GET.asString(),
HttpScheme.HTTP.asString(), HttpScheme.HTTP.asString(),

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http2.tests; package org.eclipse.jetty.http2.tests;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.ServerSocket; import java.net.ServerSocket;
@ -574,7 +575,7 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
} }
catch (HpackException x) catch (HpackException x)
{ {
x.printStackTrace(); throw new RuntimeException(x);
} }
} }
@ -591,7 +592,7 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
} }
catch (HpackException x) catch (HpackException x)
{ {
x.printStackTrace(); throw new RuntimeException(x);
} }
} }
@ -601,11 +602,11 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
{ {
// Write the frames. // Write the frames.
accumulator.writeTo(Content.Sink.from(output), false); accumulator.writeTo(Content.Sink.from(output), false);
accumulator.release(); accumulator.clear();
} }
catch (Throwable x) catch (IOException x)
{ {
x.printStackTrace(); throw new RuntimeException(x);
} }
} }
}); });

View File

@ -1470,7 +1470,6 @@ public interface RetainableByteBuffer extends Retainable
private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize) private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize)
{ {
super();
_pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool; _pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool;
_maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize; _maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize;
_buffers = buffers == null ? new ArrayList<>() : buffers; _buffers = buffers == null ? new ArrayList<>() : buffers;
@ -1481,6 +1480,12 @@ public interface RetainableByteBuffer extends Retainable
throw new IllegalArgumentException("must always retain if cannot aggregate"); throw new IllegalArgumentException("must always retain if cannot aggregate");
} }
private void checkNotReleased()
{
if (getRetained() <= 0)
throw new IllegalStateException("Already released");
}
public long getMaxSize() public long getMaxSize()
{ {
return _maxSize; return _maxSize;
@ -1515,6 +1520,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("getByteBuffer {}", this); LOG.debug("getByteBuffer {}", this);
checkNotReleased();
return switch (_buffers.size()) return switch (_buffers.size())
{ {
case 0 -> BufferUtil.EMPTY_BUFFER; case 0 -> BufferUtil.EMPTY_BUFFER;
@ -1548,6 +1554,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("take {} {}", this, length); LOG.debug("take {} {}", this, length);
checkNotReleased();
if (_buffers.isEmpty() || length == 0) if (_buffers.isEmpty() || length == 0)
return RetainableByteBuffer.EMPTY; return RetainableByteBuffer.EMPTY;
@ -1613,6 +1620,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("take {} {}", this, skip); LOG.debug("take {} {}", this, skip);
checkNotReleased();
if (_buffers.isEmpty() || skip > size()) if (_buffers.isEmpty() || skip > size())
return RetainableByteBuffer.EMPTY; return RetainableByteBuffer.EMPTY;
@ -1678,6 +1686,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("takeByteArray {}", this); LOG.debug("takeByteArray {}", this);
checkNotReleased();
return switch (_buffers.size()) return switch (_buffers.size())
{ {
case 0 -> BufferUtil.EMPTY_BUFFER.array(); case 0 -> BufferUtil.EMPTY_BUFFER.array();
@ -1723,6 +1732,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("get {}", this); LOG.debug("get {}", this);
checkNotReleased();
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{ {
RetainableByteBuffer buffer = i.next(); RetainableByteBuffer buffer = i.next();
@ -1749,6 +1759,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("get {} {}", this, index); LOG.debug("get {} {}", this, index);
checkNotReleased();
for (RetainableByteBuffer buffer : _buffers) for (RetainableByteBuffer buffer : _buffers)
{ {
long size = buffer.size(); long size = buffer.size();
@ -1764,6 +1775,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("get array {} {}", this, length); LOG.debug("get array {} {}", this, length);
checkNotReleased();
int got = 0; int got = 0;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();)
{ {
@ -1791,6 +1803,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public boolean hasRemaining() public boolean hasRemaining()
{ {
checkNotReleased();
for (RetainableByteBuffer rbb : _buffers) for (RetainableByteBuffer rbb : _buffers)
if (!rbb.isEmpty()) if (!rbb.isEmpty())
return true; return true;
@ -1802,6 +1815,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("skip {} {}", this, length); LOG.debug("skip {} {}", this, length);
checkNotReleased();
long skipped = 0; long skipped = 0;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();)
{ {
@ -1824,6 +1838,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("limit {} {}", this, limit); LOG.debug("limit {} {}", this, limit);
checkNotReleased();
for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
{ {
RetainableByteBuffer buffer = i.next(); RetainableByteBuffer buffer = i.next();
@ -1851,6 +1866,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("slice {}", this); LOG.debug("slice {}", this);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size()); List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (RetainableByteBuffer rbb : _buffers) for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.slice()); buffers.add(rbb.slice());
@ -1862,6 +1878,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("slice {} {}", this, length); LOG.debug("slice {} {}", this, length);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size()); List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
{ {
@ -1904,6 +1921,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("copy {}", this); LOG.debug("copy {}", this);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size()); List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (RetainableByteBuffer rbb : _buffers) for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.copy()); buffers.add(rbb.copy());
@ -1925,6 +1943,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public long size() public long size()
{ {
checkNotReleased();
long length = 0; long length = 0;
for (RetainableByteBuffer buffer : _buffers) for (RetainableByteBuffer buffer : _buffers)
length += buffer.remaining(); length += buffer.remaining();
@ -1953,6 +1972,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("release {}", this); LOG.debug("release {}", this);
checkNotReleased();
if (super.release()) if (super.release())
{ {
for (RetainableByteBuffer buffer : _buffers) for (RetainableByteBuffer buffer : _buffers)
@ -1985,6 +2005,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("clear {}", this); LOG.debug("clear {}", this);
checkNotReleased();
if (_buffers.isEmpty()) if (_buffers.isEmpty())
return; return;
_aggregate = null; _aggregate = null;
@ -1998,8 +2019,10 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("append BB {} <- {}", this, BufferUtil.toDetailString(bytes)); LOG.debug("append BB {} <- {}", this, BufferUtil.toDetailString(bytes));
checkNotReleased();
// Cannot mutate contents if retained // Cannot mutate contents if retained
assert !isRetained(); if (isRetained())
throw new IllegalStateException("Cannot append to a retained instance");
// handle empty appends // handle empty appends
if (bytes == null) if (bytes == null)
@ -2097,9 +2120,10 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("append RBB {} {}", this, retainableBytes); LOG.debug("append RBB {} {}", this, retainableBytes);
checkNotReleased();
// Cannot mutate contents if retained // Cannot mutate contents if retained
assert !isRetained(); if (isRetained())
throw new IllegalStateException("Cannot append to a retained instance");
// Optimize appending dynamics // Optimize appending dynamics
if (retainableBytes instanceof DynamicCapacity dynamicCapacity) if (retainableBytes instanceof DynamicCapacity dynamicCapacity)
@ -2159,6 +2183,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("add BB {} <- {}", this, BufferUtil.toDetailString(bytes)); LOG.debug("add BB {} <- {}", this, BufferUtil.toDetailString(bytes));
checkNotReleased();
add(RetainableByteBuffer.wrap(bytes)); add(RetainableByteBuffer.wrap(bytes));
return this; return this;
} }
@ -2168,6 +2193,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("add RBB {} <- {}", this, bytes); LOG.debug("add RBB {} <- {}", this, bytes);
checkNotReleased();
long size = size(); long size = size();
long space = _maxSize - size; long space = _maxSize - size;
long length = bytes.size(); long length = bytes.size();
@ -2188,6 +2214,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public Mutable put(byte b) public Mutable put(byte b)
{ {
checkNotReleased();
ensure(1).put(b); ensure(1).put(b);
return this; return this;
} }
@ -2195,6 +2222,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public Mutable put(long index, byte b) public Mutable put(long index, byte b)
{ {
checkNotReleased();
for (RetainableByteBuffer buffer : _buffers) for (RetainableByteBuffer buffer : _buffers)
{ {
long size = buffer.size(); long size = buffer.size();
@ -2211,6 +2239,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public Mutable putShort(short s) public Mutable putShort(short s)
{ {
checkNotReleased();
ensure(2).putShort(s); ensure(2).putShort(s);
return this; return this;
} }
@ -2218,6 +2247,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public Mutable putInt(int i) public Mutable putInt(int i)
{ {
checkNotReleased();
ensure(4).putInt(i); ensure(4).putInt(i);
return this; return this;
} }
@ -2225,6 +2255,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public Mutable putLong(long l) public Mutable putLong(long l)
{ {
checkNotReleased();
ensure(8).putLong(l); ensure(8).putLong(l);
return this; return this;
} }
@ -2232,6 +2263,7 @@ public interface RetainableByteBuffer extends Retainable
@Override @Override
public Mutable put(byte[] bytes, int offset, int length) public Mutable put(byte[] bytes, int offset, int length)
{ {
checkNotReleased();
// Use existing aggregate if the length is large and there is space for at least half // Use existing aggregate if the length is large and there is space for at least half
if (length >= 16 && _aggregate != null) if (length >= 16 && _aggregate != null)
{ {
@ -2293,6 +2325,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("appendTo BB {} -> {}", this, BufferUtil.toDetailString(to)); LOG.debug("appendTo BB {} -> {}", this, BufferUtil.toDetailString(to));
checkNotReleased();
_aggregate = null; _aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{ {
@ -2310,6 +2343,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("appendTo RBB {} -> {}", this, to); LOG.debug("appendTo RBB {} -> {}", this, to);
checkNotReleased();
_aggregate = null; _aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{ {
@ -2327,6 +2361,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("putTo BB {} -> {}", this, toInfillMode); LOG.debug("putTo BB {} -> {}", this, toInfillMode);
checkNotReleased();
_aggregate = null; _aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();) for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{ {
@ -2342,6 +2377,7 @@ public interface RetainableByteBuffer extends Retainable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("writeTo {} -> {} {} {}", this, sink, last, callback); LOG.debug("writeTo {} -> {} {} {}", this, sink, last, callback);
checkNotReleased();
_aggregate = null; _aggregate = null;
switch (_buffers.size()) switch (_buffers.size())
{ {