Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-07-03 16:46:23 +02:00
commit 7d8e56bd10
13 changed files with 226 additions and 288 deletions

View File

@ -93,6 +93,7 @@ public abstract class AbstractHttpClientServerTest
{
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
@ -101,6 +102,7 @@ public abstract class AbstractHttpClientServerTest
LeakTrackingByteBufferPool pool = (LeakTrackingByteBufferPool)clientBufferPool;
assertThat("Client BufferPool - leaked acquires", pool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", pool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", pool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", pool.getLeakedResources(), Matchers.is(0L));
}

View File

@ -436,7 +436,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
return stream.isResetOrFailed();
}
private boolean isProtocolFrame(Frame frame)

View File

@ -239,7 +239,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onData(final DataFrame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
@ -259,7 +259,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
else
{
if (LOG.isDebugEnabled())
LOG.debug("Stream #{} not found", streamId);
LOG.debug("Stream #{} not found on {}", streamId, this);
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
@ -297,14 +297,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onPriority(PriorityFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
}
@Override
public void onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
@ -336,7 +336,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onSettings(SettingsFrame frame, boolean reply)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
if (frame.isReply())
return;
@ -420,7 +420,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onPing(PingFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
if (frame.isReply())
{
@ -454,7 +454,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onGoAway(final GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.REMOTELY_CLOSED))
{
@ -473,7 +473,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onWindowUpdate(WindowUpdateFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
int streamId = frame.getStreamId();
int windowDelta = frame.getWindowDelta();
@ -528,7 +528,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
Throwable failure = toFailure("Stream failure", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Stream #{} failure {}", streamId, this, failure);
onStreamFailure(streamId, error, reason, failure, callback);
}
@ -549,12 +551,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void onConnectionFailure(int error, String reason, Callback callback)
{
Throwable failure = toFailure("Session failure", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Session failure {}", this, failure);
onFailure(error, reason, failure, new FailureCallback(error, reason, callback));
}
protected void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Session abort {}", this, failure);
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
}
@ -574,7 +580,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
int error = frame.getError();
String reason = frame.tryConvertPayload();
Throwable failure = toFailure("Session close", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Session close {}", this, failure);
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
@ -585,9 +593,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
notifyClose(this, frame, countCallback);
}
private Throwable toFailure(String message, int error, String reason)
private Throwable toFailure(int error, String reason)
{
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
return new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason));
}
@Override
@ -777,7 +785,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void frame(HTTP2Flusher.Entry entry, boolean flush)
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", flush ? "Sending" : "Queueing", entry.frame);
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this);
// Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush)
@ -873,7 +881,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
onStreamClosed(stream);
flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled())
LOG.debug("Removed {} {}", stream.isLocal() ? "local" : "remote", stream);
LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this);
}
}

View File

@ -42,6 +42,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MathUtils;
@ -61,7 +62,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicReference<Callback> writing = new AtomicReference<>();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final long timeStamp = System.nanoTime();
@ -69,9 +69,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private final int streamId;
private final MetaData.Request request;
private final boolean local;
private Callback sendCallback;
private Throwable failure;
private boolean localReset;
private Listener listener;
private boolean remoteReset;
private Listener listener;
private long dataLength;
private long dataDemand;
private boolean dataInitial;
@ -141,17 +143,31 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public void reset(ResetFrame frame, Callback callback)
{
if (isReset())
return;
localReset = true;
synchronized (this)
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
}
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
}
private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
callback.failed(new WritePendingException());
Throwable failure;
synchronized (this)
{
failure = this.failure;
if (failure == null && sendCallback == null)
{
sendCallback = callback;
return true;
}
}
if (failure == null)
failure = new WritePendingException();
callback.failed(failure);
return false;
}
@ -176,7 +192,27 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public boolean isReset()
{
return localReset || remoteReset;
synchronized (this)
{
return localReset || remoteReset;
}
}
private boolean isFailed()
{
synchronized (this)
{
return failure != null;
}
}
@Override
public boolean isResetOrFailed()
{
synchronized (this)
{
return isReset() || isFailed();
}
}
@Override
@ -440,7 +476,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onReset(ResetFrame frame, Callback callback)
{
remoteReset = true;
synchronized (this)
{
remoteReset = true;
failure = new EofException("reset");
}
close();
session.removeStream(this);
notifyReset(this, frame, callback);
@ -461,8 +501,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
synchronized (this)
{
failure = frame.getFailure();
}
close();
session.removeStream(this);
notifyFailure(this, frame, callback);
}
@ -645,7 +689,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private Callback endWrite()
{
return writing.getAndSet(null);
synchronized (this)
{
Callback callback = sendCallback;
sendCallback = null;
return callback;
}
}
private void notifyNewStream(Stream stream)

View File

@ -114,4 +114,11 @@ public interface IStream extends Stream, Closeable
* @see #isClosed()
*/
boolean isRemotelyClosed();
/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();
}

View File

@ -149,7 +149,7 @@ public class Parser
return false;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame header from {}", headerParser, buffer);
LOG.debug("Parsed {} frame header from {}@{}", headerParser, buffer, Integer.toHexString(buffer.hashCode()));
if (headerParser.getLength() > getMaxFrameLength())
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR, "invalid_frame_length");
@ -199,7 +199,7 @@ public class Parser
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(type), buffer);
LOG.debug("Parsed {} frame body from {}@{}", FrameType.from(type), buffer, Integer.toHexString(buffer.hashCode()));
reset();
return true;
}

View File

@ -330,7 +330,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public void onStreamFailure(Throwable failure)
{
transportCallback.failed(failure);
transportCallback.abort(failure);
}
public boolean onStreamTimeout(Throwable failure)
@ -408,17 +408,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
* being reset, or the connection being closed</li>
* <li>an asynchronous idle timeout</li>
* </ul>
* <p>The last 2 cases may happen <em>during</em> a send, when the frames
* are being generated in the flusher.
* In such cases, this class must avoid that the nested callback is notified
* while the frame generation is in progress, because the nested callback
* may modify other states (such as clearing the {@code HttpOutput._buffer})
* that are accessed during frame generation.</p>
* <p>The solution implemented in this class works by splitting the send
* operation in 3 parts: {@code pre-send}, {@code send} and {@code post-send}.
* Asynchronous state changes happening during {@code send} are stored
* and only executed in {@code post-send}, therefore never interfering
* with frame generation.</p>
*
* @see State
*/
@ -442,14 +431,14 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
Throwable failure = sending(callback, commit);
if (failure == null)
{
sendFrame.accept(this);
pending();
}
else
{
callback.failed(failure);
}
}
private void abort(Throwable failure)
{
failed(failure);
}
private Throwable sending(Callback callback, boolean commit)
@ -477,58 +466,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
}
private void pending()
{
Callback callback;
boolean commit;
Throwable failure;
synchronized (this)
{
switch (_state)
{
case SENDING:
{
// The send has not completed the callback yet,
// wait for succeeded() or failed() to be called.
_state = State.PENDING;
return;
}
case SUCCEEDING:
{
// The send already completed successfully, but the
// call to succeeded() was delayed, so call it now.
callback = _callback;
commit = _commit;
failure = null;
reset(null);
break;
}
case FAILING:
{
// The send already completed with a failure, but
// the call to failed() was delayed, so call it now.
callback = _callback;
commit = _commit;
failure = _failure;
reset(failure);
break;
}
default:
{
callback = _callback;
commit = _commit;
failure = new IllegalStateException("Invalid transport state: " + _state);
reset(failure);
break;
}
}
}
if (failure == null)
succeed(callback, commit);
else
fail(callback, commit, failure);
}
@Override
public void succeeded()
{
@ -536,30 +473,21 @@ public class HttpTransportOverHTTP2 implements HttpTransport
boolean commit;
synchronized (this)
{
switch (_state)
if (_state != State.SENDING)
{
case SENDING:
{
_state = State.SUCCEEDING;
// Succeeding the callback will be done in postSend().
return;
}
case PENDING:
{
callback = _callback;
commit = _commit;
reset(null);
break;
}
default:
{
// This thread lost the race to succeed the current
// send, as other threads likely already failed it.
return;
}
// This thread lost the race to succeed the current
// send, as other threads likely already failed it.
return;
}
callback = _callback;
commit = _commit;
reset(null);
}
succeed(callback, commit);
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} success",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush");
callback.succeeded();
}
@Override
@ -569,104 +497,37 @@ public class HttpTransportOverHTTP2 implements HttpTransport
boolean commit;
synchronized (this)
{
switch (_state)
if (_state != State.SENDING)
{
case SENDING:
{
_state = State.FAILING;
_failure = failure;
// Failing the callback will be done in postSend().
return;
}
case IDLE:
case PENDING:
{
callback = _callback;
commit = _commit;
reset(failure);
break;
}
default:
{
// This thread lost the race to fail the current send,
// as other threads already succeeded or failed it.
return;
}
reset(failure);
return;
}
callback = _callback;
commit = _commit;
reset(failure);
}
fail(callback, commit, failure);
}
private boolean idleTimeout(Throwable failure)
{
Callback callback;
boolean timeout;
synchronized (this)
{
switch (_state)
{
case PENDING:
{
// The send was started but idle timed out, fail it.
callback = _callback;
timeout = true;
reset(failure);
break;
}
case IDLE:
// The application may be suspended, ignore the idle timeout.
case SENDING:
// A send has been started at the same time of an idle timeout;
// Ignore the idle timeout and let the write continue normally.
case SUCCEEDING:
case FAILING:
// An idle timeout during these transient states is ignored.
case FAILED:
// Already failed, ignore the idle timeout.
{
callback = null;
timeout = false;
break;
}
default:
{
// Should not happen, but just in case.
callback = _callback;
if (callback == null)
callback = Callback.NOOP;
timeout = true;
failure = new IllegalStateException("Invalid transport state: " + _state, failure);
reset(failure);
break;
}
}
}
idleTimeout(callback, timeout, failure);
return timeout;
}
private void succeed(Callback callback, boolean commit)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} success",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush");
callback.succeeded();
}
private void fail(Callback callback, boolean commit, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} failure",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush",
failure);
if (callback != null)
callback.failed(failure);
callback.failed(failure);
}
private void idleTimeout(Callback callback, boolean timeout, Throwable failure)
private boolean idleTimeout(Throwable failure)
{
Callback callback = null;
synchronized (this)
{
// Ignore idle timeouts if not writing,
// as the application may be suspended.
if (_state == State.SENDING)
{
callback = _callback;
reset(failure);
}
}
boolean timeout = callback != null;
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} idle timeout {}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
@ -674,6 +535,18 @@ public class HttpTransportOverHTTP2 implements HttpTransport
failure);
if (timeout)
callback.failed(failure);
return timeout;
}
@Override
public InvocationType getInvocationType()
{
Callback callback;
synchronized (this)
{
callback = _callback;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
}
@ -686,67 +559,12 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
/**
* <p>No send initiated or in progress.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #SENDING}, when {@link TransportCallback#send(Callback, boolean, Consumer)}
* is called by the transport to initiate a send</li>
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
* is called by an asynchronous failure</li>
* </ul>
*/
IDLE,
/**
* <p>A send is initiated; the nested callback in {@link TransportCallback}
* cannot be notified while in this state.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #SUCCEEDING}, when {@link TransportCallback#succeeded()}
* is called synchronously because the send succeeded</li>
* <li>{@link #FAILING}, when {@link TransportCallback#failed(Throwable)}
* is called synchronously because the send failed</li>
* <li>{@link #PENDING}, when {@link TransportCallback#pending()}
* is called before the send completes</li>
* </ul>
* <p>A send is initiated and possibly in progress.</p>
*/
SENDING,
/**
* <p>A send was initiated and is now pending, waiting for the {@link TransportCallback}
* to be notified of success or failure.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #IDLE}, when {@link TransportCallback#succeeded()}
* is called because the send succeeded</li>
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
* is called because either the send failed, or an asynchronous failure happened</li>
* </ul>
*/
PENDING,
/**
* <p>A send was initiated and succeeded, but {@link TransportCallback#pending()}
* has not been called yet.</p>
* <p>This state indicates that the success actions (such as notifying the
* {@link TransportCallback} nested callback) must be performed when
* {@link TransportCallback#pending()} is called.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #IDLE}, when {@link TransportCallback#pending()}
* is called</li>
* </ul>
*/
SUCCEEDING,
/**
* <p>A send was initiated and failed, but {@link TransportCallback#pending()}
* has not been called yet.</p>
* <p>This state indicates that the failure actions (such as notifying the
* {@link TransportCallback} nested callback) must be performed when
* {@link TransportCallback#pending()} is called.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #FAILED}, when {@link TransportCallback#pending()}
* is called</li>
* </ul>
*/
FAILING,
/**
* <p>The terminal state indicating failure of the send.</p>
*/

View File

@ -57,6 +57,18 @@ public interface ByteBufferPool
*/
public void release(ByteBuffer buffer);
/**
* <p>Removes a {@link ByteBuffer} that was previously obtained with {@link #acquire(int, boolean)}.</p>
* <p>The buffer will not be available for further reuse.</p>
*
* @param buffer the buffer to remove
* @see #acquire(int, boolean)
* @see #release(ByteBuffer)
*/
default void remove(ByteBuffer buffer)
{
}
/**
* <p>Creates a new ByteBuffer of the given capacity and the given directness.</p>
*

View File

@ -23,10 +23,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ManagedObject
public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements ByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(LeakTrackingByteBufferPool.class);
@ -47,11 +50,11 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
}
};
private static final boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY");
private final ByteBufferPool delegate;
private final AtomicLong leakedReleases = new AtomicLong(0);
private final AtomicLong leakedAcquires = new AtomicLong(0);
private final AtomicLong leakedReleases = new AtomicLong(0);
private final AtomicLong leakedRemoves = new AtomicLong(0);
private final AtomicLong leaked = new AtomicLong(0);
private final ByteBufferPool delegate;
public LeakTrackingByteBufferPool(ByteBufferPool delegate)
{
@ -64,12 +67,12 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer buffer = delegate.acquire(size, direct);
boolean leaked = leakDetector.acquired(buffer);
if (NOISY || !leaked)
boolean acquired = leakDetector.acquired(buffer);
if (!acquired)
{
leakedAcquires.incrementAndGet();
LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"),
new Throwable("LeakStack.Acquire"));
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer leaked acquire for id {}", leakDetector.id(buffer), new Throwable("acquire"));
}
return buffer;
}
@ -79,16 +82,36 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
{
if (buffer == null)
return;
boolean leaked = leakDetector.released(buffer);
if (NOISY || !leaked)
boolean released = leakDetector.released(buffer);
if (!released)
{
leakedReleases.incrementAndGet();
LOG.info(String.format("ByteBuffer release %s leaked.released=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"), new Throwable(
"LeakStack.Release"));
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer leaked release for id {}", leakDetector.id(buffer), new Throwable("release"));
}
delegate.release(buffer);
}
@Override
public void remove(ByteBuffer buffer)
{
if (buffer == null)
return;
boolean released = leakDetector.released(buffer);
if (!released)
{
leakedRemoves.incrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer leaked remove for id {}", leakDetector.id(buffer), new Throwable("remove"));
}
delegate.remove(buffer);
}
/**
* Clears the tracking data returned by {@link #getLeakedAcquires()},
* {@link #getLeakedReleases()}, {@link #getLeakedResources()}.
*/
@ManagedAttribute("Clears the tracking data")
public void clearTracking()
{
leakedAcquires.set(0);
@ -96,24 +119,36 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
}
/**
* @return count of BufferPool.acquire() calls that detected a leak
* @return count of ByteBufferPool.acquire() calls that detected a leak
*/
@ManagedAttribute("The number of acquires that produced a leak")
public long getLeakedAcquires()
{
return leakedAcquires.get();
}
/**
* @return count of BufferPool.release() calls that detected a leak
* @return count of ByteBufferPool.release() calls that detected a leak
*/
@ManagedAttribute("The number of releases that produced a leak")
public long getLeakedReleases()
{
return leakedReleases.get();
}
/**
* @return count of ByteBufferPool.remove() calls that detected a leak
*/
@ManagedAttribute("The number of removes that produced a leak")
public long getLeakedRemoves()
{
return leakedRemoves.get();
}
/**
* @return count of resources that were acquired but not released
*/
@ManagedAttribute("The number of resources that were leaked")
public long getLeakedResources()
{
return leaked.get();

View File

@ -504,7 +504,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// TODO that is done.
// Set a close callback on the HttpOutput to make it an async callback
_response.completeOutput(Callback.from(_state::completed));
_response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed));
break;
}
@ -644,7 +644,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
_request.setHandled(true);
_state.completing();
sendResponse(null, _response.getHttpOutput().getBuffer(), true, Callback.from(_state::completed));
sendResponse(null, _response.getHttpOutput().getBuffer(), true, Callback.from(() -> _state.completed(null), _state::completed));
}
catch (Throwable x)
{
@ -1250,7 +1250,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override
public void succeeded()
{
_response.getHttpOutput().completed();
_response.getHttpOutput().completed(null);
super.failed(x);
}

View File

@ -948,7 +948,7 @@ public class HttpChannelState
}
}
protected void completed()
protected void completed(Throwable failure)
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
@ -981,7 +981,7 @@ public class HttpChannelState
}
// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().completed();
_channel.getResponse().getHttpOutput().completed(failure);
if (event != null)
{

View File

@ -36,6 +36,7 @@ import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -288,7 +289,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_state = State.CLOSED;
closedCallback = _closedCallback;
_closedCallback = null;
releaseBuffer();
releaseBuffer(failure);
wake = updateApiState(failure);
}
else if (_state == State.CLOSE)
@ -470,12 +471,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
/**
* Called to indicate that the request cycle has been completed.
*/
public void completed()
public void completed(Throwable failure)
{
synchronized (_channelState)
{
_state = State.CLOSED;
releaseBuffer();
releaseBuffer(failure);
}
}
@ -614,11 +615,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _aggregate;
}
private void releaseBuffer()
private void releaseBuffer(Throwable failure)
{
if (_aggregate != null)
{
_channel.getConnector().getByteBufferPool().release(_aggregate);
ByteBufferPool bufferPool = _channel.getConnector().getByteBufferPool();
if (failure == null)
bufferPool.release(_aggregate);
else
bufferPool.remove(_aggregate);
_aggregate = null;
}
}
@ -1353,7 +1358,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
releaseBuffer(null);
_written = 0;
_writeListener = null;
_onError = null;

View File

@ -117,6 +117,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
@ -126,6 +127,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", clientBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
}