Jetty 9.4.x 4967 buffer corruption for http2 failures (#5001)

Fixes #4967 - Possible buffer corruption in HTTP/2 session failures

Partially reverted the changes introduced in #4855, because they
were working only when sends were synchronous.

Introduced ByteBufferPool.remove(ByteBuffer) to fix the issue.
Now when a concurrent failure happens while frames are being
generated or sent, the buffer is discarded instead of being
recycled, therefore resolving the buffer corruption.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-07-03 16:35:33 +02:00 committed by GitHub
parent e0955192b8
commit ae43b70a9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 226 additions and 288 deletions

View File

@ -93,6 +93,7 @@ public abstract class AbstractHttpClientServerTest
{
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
@ -101,6 +102,7 @@ public abstract class AbstractHttpClientServerTest
LeakTrackingByteBufferPool pool = (LeakTrackingByteBufferPool)clientBufferPool;
assertThat("Client BufferPool - leaked acquires", pool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", pool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", pool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", pool.getLeakedResources(), Matchers.is(0L));
}

View File

@ -436,7 +436,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
return stream.isResetOrFailed();
}
private boolean isProtocolFrame(Frame frame)

View File

@ -232,7 +232,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onData(final DataFrame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
@ -252,7 +252,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
else
{
if (LOG.isDebugEnabled())
LOG.debug("Stream #{} not found", streamId);
LOG.debug("Stream #{} not found on {}", streamId, this);
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
@ -290,14 +290,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onPriority(PriorityFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
}
@Override
public void onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
@ -329,7 +329,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onSettings(SettingsFrame frame, boolean reply)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
if (frame.isReply())
return;
@ -405,7 +405,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onPing(PingFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
if (frame.isReply())
{
@ -439,7 +439,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onGoAway(final GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
while (true)
{
@ -472,7 +472,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onWindowUpdate(WindowUpdateFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
int streamId = frame.getStreamId();
int windowDelta = frame.getWindowDelta();
@ -512,7 +512,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
Throwable failure = toFailure("Stream failure", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Stream #{} failure {}", streamId, this, failure);
onStreamFailure(streamId, error, reason, failure, callback);
}
@ -533,12 +535,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void onConnectionFailure(int error, String reason, Callback callback)
{
Throwable failure = toFailure("Session failure", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Session failure {}", this, failure);
onFailure(error, reason, failure, new CloseCallback(error, reason, callback));
}
protected void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Session abort {}", this, failure);
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
}
@ -558,7 +564,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
int error = frame.getError();
String reason = frame.tryConvertPayload();
Throwable failure = toFailure("Session close", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Session close {}", this, failure);
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
@ -569,9 +577,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
notifyClose(this, frame, countCallback);
}
private Throwable toFailure(String message, int error, String reason)
private Throwable toFailure(int error, String reason)
{
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
return new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason));
}
@Override
@ -721,7 +729,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void frame(HTTP2Flusher.Entry entry, boolean flush)
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", flush ? "Sending" : "Queueing", entry.frame);
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this);
// Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush)
@ -822,7 +830,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
onStreamClosed(stream);
flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled())
LOG.debug("Removed {} {}", stream.isLocal() ? "local" : "remote", stream);
LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this);
}
}

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -54,16 +55,17 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicReference<Callback> writing = new AtomicReference<>();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final long timeStamp = System.nanoTime();
private final ISession session;
private final int streamId;
private final boolean local;
private Callback sendCallback;
private Throwable failure;
private boolean localReset;
private Listener listener;
private boolean remoteReset;
private Listener listener;
private long dataLength;
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
@ -128,17 +130,31 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public void reset(ResetFrame frame, Callback callback)
{
if (isReset())
return;
localReset = true;
synchronized (this)
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
}
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
}
private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
callback.failed(new WritePendingException());
Throwable failure;
synchronized (this)
{
failure = this.failure;
if (failure == null && sendCallback == null)
{
sendCallback = callback;
return true;
}
}
if (failure == null)
failure = new WritePendingException();
callback.failed(failure);
return false;
}
@ -163,7 +179,27 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public boolean isReset()
{
return localReset || remoteReset;
synchronized (this)
{
return localReset || remoteReset;
}
}
private boolean isFailed()
{
synchronized (this)
{
return failure != null;
}
}
@Override
public boolean isResetOrFailed()
{
synchronized (this)
{
return isReset() || isFailed();
}
}
@Override
@ -336,7 +372,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onReset(ResetFrame frame, Callback callback)
{
remoteReset = true;
synchronized (this)
{
remoteReset = true;
failure = new EofException("reset");
}
close();
session.removeStream(this);
notifyReset(this, frame, callback);
@ -357,8 +397,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
synchronized (this)
{
failure = frame.getFailure();
}
close();
session.removeStream(this);
notifyFailure(this, frame, callback);
}
@ -541,7 +585,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private Callback endWrite()
{
return writing.getAndSet(null);
synchronized (this)
{
Callback callback = sendCallback;
sendCallback = null;
return callback;
}
}
private void notifyData(Stream stream, DataFrame frame, Callback callback)

View File

@ -114,4 +114,11 @@ public interface IStream extends Stream, Closeable
* @see #isClosed()
*/
boolean isRemotelyClosed();
/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();
}

View File

@ -149,7 +149,7 @@ public class Parser
return false;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame header from {}", headerParser, buffer);
LOG.debug("Parsed {} frame header from {}@{}", headerParser, buffer, Integer.toHexString(buffer.hashCode()));
if (headerParser.getLength() > getMaxFrameLength())
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR, "invalid_frame_length");
@ -199,7 +199,7 @@ public class Parser
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(type), buffer);
LOG.debug("Parsed {} frame body from {}@{}", FrameType.from(type), buffer, Integer.toHexString(buffer.hashCode()));
reset();
return true;
}

View File

@ -313,7 +313,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public void onStreamFailure(Throwable failure)
{
transportCallback.failed(failure);
transportCallback.abort(failure);
}
public boolean onStreamTimeout(Throwable failure)
@ -361,17 +361,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
* being reset, or the connection being closed</li>
* <li>an asynchronous idle timeout</li>
* </ul>
* <p>The last 2 cases may happen <em>during</em> a send, when the frames
* are being generated in the flusher.
* In such cases, this class must avoid that the nested callback is notified
* while the frame generation is in progress, because the nested callback
* may modify other states (such as clearing the {@code HttpOutput._buffer})
* that are accessed during frame generation.</p>
* <p>The solution implemented in this class works by splitting the send
* operation in 3 parts: {@code pre-send}, {@code send} and {@code post-send}.
* Asynchronous state changes happening during {@code send} are stored
* and only executed in {@code post-send}, therefore never interfering
* with frame generation.</p>
*
* @see State
*/
@ -395,14 +384,14 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
Throwable failure = sending(callback, commit);
if (failure == null)
{
sendFrame.accept(this);
pending();
}
else
{
callback.failed(failure);
}
}
private void abort(Throwable failure)
{
failed(failure);
}
private Throwable sending(Callback callback, boolean commit)
@ -430,58 +419,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
}
private void pending()
{
Callback callback;
boolean commit;
Throwable failure;
synchronized (this)
{
switch (_state)
{
case SENDING:
{
// The send has not completed the callback yet,
// wait for succeeded() or failed() to be called.
_state = State.PENDING;
return;
}
case SUCCEEDING:
{
// The send already completed successfully, but the
// call to succeeded() was delayed, so call it now.
callback = _callback;
commit = _commit;
failure = null;
reset(null);
break;
}
case FAILING:
{
// The send already completed with a failure, but
// the call to failed() was delayed, so call it now.
callback = _callback;
commit = _commit;
failure = _failure;
reset(failure);
break;
}
default:
{
callback = _callback;
commit = _commit;
failure = new IllegalStateException("Invalid transport state: " + _state);
reset(failure);
break;
}
}
}
if (failure == null)
succeed(callback, commit);
else
fail(callback, commit, failure);
}
@Override
public void succeeded()
{
@ -489,30 +426,21 @@ public class HttpTransportOverHTTP2 implements HttpTransport
boolean commit;
synchronized (this)
{
switch (_state)
if (_state != State.SENDING)
{
case SENDING:
{
_state = State.SUCCEEDING;
// Succeeding the callback will be done in postSend().
return;
}
case PENDING:
{
callback = _callback;
commit = _commit;
reset(null);
break;
}
default:
{
// This thread lost the race to succeed the current
// send, as other threads likely already failed it.
return;
}
// This thread lost the race to succeed the current
// send, as other threads likely already failed it.
return;
}
callback = _callback;
commit = _commit;
reset(null);
}
succeed(callback, commit);
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} success",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush");
callback.succeeded();
}
@Override
@ -522,104 +450,37 @@ public class HttpTransportOverHTTP2 implements HttpTransport
boolean commit;
synchronized (this)
{
switch (_state)
if (_state != State.SENDING)
{
case SENDING:
{
_state = State.FAILING;
_failure = failure;
// Failing the callback will be done in postSend().
return;
}
case IDLE:
case PENDING:
{
callback = _callback;
commit = _commit;
reset(failure);
break;
}
default:
{
// This thread lost the race to fail the current send,
// as other threads already succeeded or failed it.
return;
}
reset(failure);
return;
}
callback = _callback;
commit = _commit;
reset(failure);
}
fail(callback, commit, failure);
}
private boolean idleTimeout(Throwable failure)
{
Callback callback;
boolean timeout;
synchronized (this)
{
switch (_state)
{
case PENDING:
{
// The send was started but idle timed out, fail it.
callback = _callback;
timeout = true;
reset(failure);
break;
}
case IDLE:
// The application may be suspended, ignore the idle timeout.
case SENDING:
// A send has been started at the same time of an idle timeout;
// Ignore the idle timeout and let the write continue normally.
case SUCCEEDING:
case FAILING:
// An idle timeout during these transient states is ignored.
case FAILED:
// Already failed, ignore the idle timeout.
{
callback = null;
timeout = false;
break;
}
default:
{
// Should not happen, but just in case.
callback = _callback;
if (callback == null)
callback = Callback.NOOP;
timeout = true;
failure = new IllegalStateException("Invalid transport state: " + _state, failure);
reset(failure);
break;
}
}
}
idleTimeout(callback, timeout, failure);
return timeout;
}
private void succeed(Callback callback, boolean commit)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} success",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush");
callback.succeeded();
}
private void fail(Callback callback, boolean commit, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} failure",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush",
failure);
if (callback != null)
callback.failed(failure);
callback.failed(failure);
}
private void idleTimeout(Callback callback, boolean timeout, Throwable failure)
private boolean idleTimeout(Throwable failure)
{
Callback callback = null;
synchronized (this)
{
// Ignore idle timeouts if not writing,
// as the application may be suspended.
if (_state == State.SENDING)
{
callback = _callback;
reset(failure);
}
}
boolean timeout = callback != null;
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} idle timeout {}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
@ -627,6 +488,18 @@ public class HttpTransportOverHTTP2 implements HttpTransport
failure);
if (timeout)
callback.failed(failure);
return timeout;
}
@Override
public InvocationType getInvocationType()
{
Callback callback;
synchronized (this)
{
callback = _callback;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
}
@ -639,67 +512,12 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
/**
* <p>No send initiated or in progress.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #SENDING}, when {@link TransportCallback#send(Callback, boolean, Consumer)}
* is called by the transport to initiate a send</li>
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
* is called by an asynchronous failure</li>
* </ul>
*/
IDLE,
/**
* <p>A send is initiated; the nested callback in {@link TransportCallback}
* cannot be notified while in this state.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #SUCCEEDING}, when {@link TransportCallback#succeeded()}
* is called synchronously because the send succeeded</li>
* <li>{@link #FAILING}, when {@link TransportCallback#failed(Throwable)}
* is called synchronously because the send failed</li>
* <li>{@link #PENDING}, when {@link TransportCallback#pending()}
* is called before the send completes</li>
* </ul>
* <p>A send is initiated and possibly in progress.</p>
*/
SENDING,
/**
* <p>A send was initiated and is now pending, waiting for the {@link TransportCallback}
* to be notified of success or failure.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #IDLE}, when {@link TransportCallback#succeeded()}
* is called because the send succeeded</li>
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
* is called because either the send failed, or an asynchronous failure happened</li>
* </ul>
*/
PENDING,
/**
* <p>A send was initiated and succeeded, but {@link TransportCallback#pending()}
* has not been called yet.</p>
* <p>This state indicates that the success actions (such as notifying the
* {@link TransportCallback} nested callback) must be performed when
* {@link TransportCallback#pending()} is called.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #IDLE}, when {@link TransportCallback#pending()}
* is called</li>
* </ul>
*/
SUCCEEDING,
/**
* <p>A send was initiated and failed, but {@link TransportCallback#pending()}
* has not been called yet.</p>
* <p>This state indicates that the failure actions (such as notifying the
* {@link TransportCallback} nested callback) must be performed when
* {@link TransportCallback#pending()} is called.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #FAILED}, when {@link TransportCallback#pending()}
* is called</li>
* </ul>
*/
FAILING,
/**
* <p>The terminal state indicating failure of the send.</p>
*/

View File

@ -57,6 +57,18 @@ public interface ByteBufferPool
*/
void release(ByteBuffer buffer);
/**
* <p>Removes a {@link ByteBuffer} that was previously obtained with {@link #acquire(int, boolean)}.</p>
* <p>The buffer will not be available for further reuse.</p>
*
* @param buffer the buffer to remove
* @see #acquire(int, boolean)
* @see #release(ByteBuffer)
*/
default void remove(ByteBuffer buffer)
{
}
/**
* <p>Creates a new ByteBuffer of the given capacity and the given directness.</p>
*

View File

@ -23,10 +23,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ManagedObject
public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements ByteBufferPool
{
private static final Logger LOG = Log.getLogger(LeakTrackingByteBufferPool.class);
@ -47,11 +50,11 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
}
};
private static final boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY");
private final ByteBufferPool delegate;
private final AtomicLong leakedReleases = new AtomicLong(0);
private final AtomicLong leakedAcquires = new AtomicLong(0);
private final AtomicLong leakedReleases = new AtomicLong(0);
private final AtomicLong leakedRemoves = new AtomicLong(0);
private final AtomicLong leaked = new AtomicLong(0);
private final ByteBufferPool delegate;
public LeakTrackingByteBufferPool(ByteBufferPool delegate)
{
@ -64,12 +67,12 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer buffer = delegate.acquire(size, direct);
boolean leaked = leakDetector.acquired(buffer);
if (NOISY || !leaked)
boolean acquired = leakDetector.acquired(buffer);
if (!acquired)
{
leakedAcquires.incrementAndGet();
LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"),
new Throwable("LeakStack.Acquire"));
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer leaked acquire for id {}", leakDetector.id(buffer), new Throwable("acquire"));
}
return buffer;
}
@ -79,16 +82,36 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
{
if (buffer == null)
return;
boolean leaked = leakDetector.released(buffer);
if (NOISY || !leaked)
boolean released = leakDetector.released(buffer);
if (!released)
{
leakedReleases.incrementAndGet();
LOG.info(String.format("ByteBuffer release %s leaked.released=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"), new Throwable(
"LeakStack.Release"));
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer leaked release for id {}", leakDetector.id(buffer), new Throwable("release"));
}
delegate.release(buffer);
}
@Override
public void remove(ByteBuffer buffer)
{
if (buffer == null)
return;
boolean released = leakDetector.released(buffer);
if (!released)
{
leakedRemoves.incrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer leaked remove for id {}", leakDetector.id(buffer), new Throwable("remove"));
}
delegate.remove(buffer);
}
/**
* Clears the tracking data returned by {@link #getLeakedAcquires()},
* {@link #getLeakedReleases()}, {@link #getLeakedResources()}.
*/
@ManagedAttribute("Clears the tracking data")
public void clearTracking()
{
leakedAcquires.set(0);
@ -96,24 +119,36 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
}
/**
* @return count of BufferPool.acquire() calls that detected a leak
* @return count of ByteBufferPool.acquire() calls that detected a leak
*/
@ManagedAttribute("The number of acquires that produced a leak")
public long getLeakedAcquires()
{
return leakedAcquires.get();
}
/**
* @return count of BufferPool.release() calls that detected a leak
* @return count of ByteBufferPool.release() calls that detected a leak
*/
@ManagedAttribute("The number of releases that produced a leak")
public long getLeakedReleases()
{
return leakedReleases.get();
}
/**
* @return count of ByteBufferPool.remove() calls that detected a leak
*/
@ManagedAttribute("The number of removes that produced a leak")
public long getLeakedRemoves()
{
return leakedRemoves.get();
}
/**
* @return count of resources that were acquired but not released
*/
@ManagedAttribute("The number of resources that were leaked")
public long getLeakedResources()
{
return leaked.get();

View File

@ -518,7 +518,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// TODO that is done.
// Set a close callback on the HttpOutput to make it an async callback
_response.completeOutput(Callback.from(_state::completed));
_response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed));
break;
}
@ -633,7 +633,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
_request.setHandled(true);
_state.completing();
sendResponse(null, _response.getHttpOutput().getBuffer(), true, Callback.from(_state::completed));
sendResponse(null, _response.getHttpOutput().getBuffer(), true, Callback.from(() -> _state.completed(null), _state::completed));
}
catch (Throwable x)
{
@ -1221,7 +1221,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override
public void succeeded()
{
_response.getHttpOutput().completed();
_response.getHttpOutput().completed(null);
super.failed(x);
}

View File

@ -943,7 +943,7 @@ public class HttpChannelState
}
}
protected void completed()
protected void completed(Throwable failure)
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
@ -976,7 +976,7 @@ public class HttpChannelState
}
// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().completed();
_channel.getResponse().getHttpOutput().completed(failure);
if (event != null)
{

View File

@ -37,6 +37,7 @@ import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -300,7 +301,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_state = State.CLOSED;
closedCallback = _closedCallback;
_closedCallback = null;
releaseBuffer();
releaseBuffer(failure);
wake = updateApiState(failure);
}
else if (_state == State.CLOSE)
@ -482,12 +483,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
/**
* Called to indicate that the request cycle has been completed.
*/
public void completed()
public void completed(Throwable failure)
{
synchronized (_channelState)
{
_state = State.CLOSED;
releaseBuffer();
releaseBuffer(failure);
}
}
@ -626,11 +627,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _aggregate;
}
private void releaseBuffer()
private void releaseBuffer(Throwable failure)
{
if (_aggregate != null)
{
_channel.getConnector().getByteBufferPool().release(_aggregate);
ByteBufferPool bufferPool = _channel.getConnector().getByteBufferPool();
if (failure == null)
bufferPool.release(_aggregate);
else
bufferPool.remove(_aggregate);
_aggregate = null;
}
}
@ -1401,7 +1406,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
releaseBuffer(null);
_written = 0;
_writeListener = null;
_onError = null;

View File

@ -114,6 +114,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
@ -123,6 +124,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", clientBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
}