Issue #272 - WebSocket hangs in blockingWrite.

Rewritten FrameFlusher.
This commit is contained in:
Simone Bordet 2017-10-05 22:20:22 +02:00
parent 6faaf3346c
commit 4389da9196
2 changed files with 288 additions and 351 deletions

View File

@ -68,19 +68,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
@Override @Override
protected void onFailure(Throwable x) public void onCompleteFailure(Throwable failure)
{ {
notifyError(x); super.onCompleteFailure(failure);
notifyError(failure);
if (ioState.wasAbnormalClose()) if (ioState.wasAbnormalClose())
{ {
LOG.ignore(x); LOG.ignore(failure);
return; return;
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Write flush failure",x); LOG.debug("Write flush failure", failure);
ioState.onWriteFailure(x); ioState.onWriteFailure(failure);
disconnect();
} }
} }
@ -186,6 +186,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.session = session; this.session = session;
} }
@Override
public boolean onIdleExpired()
{
// TODO: handle closing handshake (see HTTP2Connection).
return super.onIdleExpired();
}
/** /**
* Jetty Connection Close * Jetty Connection Close
*/ */
@ -200,17 +207,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} disconnect()",policy.getBehavior()); LOG.debug("{} disconnect()",policy.getBehavior());
flusher.terminate(new EOFException("Disconnected"), false);
try
{
flusher.close();
}
catch (Throwable ignored)
{
LOG.ignore(ignored);
}
// close FrameFlusher, we cannot write anymore at this point.
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow // We need to gently close first, to allow
// SSL close alerts to be sent by Jetty // SSL close alerts to be sent by Jetty
@ -323,7 +320,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("{} onClose()",policy.getBehavior()); LOG.debug("{} onClose()",policy.getBehavior());
super.onClose(); super.onClose();
ioState.onDisconnected(); ioState.onDisconnected();
flusher.close();
} }
@Override @Override

View File

@ -18,8 +18,8 @@
package org.eclipse.jetty.websocket.common.io; package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
@ -39,212 +39,308 @@ import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame; import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
/** public class FrameFlusher extends IteratingCallback
* Interface for working with bytes destined for {@link EndPoint#write(org.eclipse.jetty.util.Callback, ByteBuffer...)}
*/
public class FrameFlusher
{ {
private class Flusher extends IteratingCallback public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private final ByteBufferPool bufferPool;
private final EndPoint endPoint;
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Deque<FrameEntry> queue = new ArrayDeque<>();
private final List<FrameEntry> entries;
private final List<ByteBuffer> buffers;
private boolean closed;
private Throwable terminated;
private ByteBuffer aggregate;
private BatchMode batchMode;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
{ {
private final List<FrameEntry> entries; this.bufferPool = bufferPool;
private final List<ByteBuffer> buffers; this.endPoint = endPoint;
private ByteBuffer aggregate; this.bufferSize = bufferSize;
private BatchMode batchMode; this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
this.entries = new ArrayList<>(maxGather);
this.buffers = new ArrayList<>((maxGather * 2) + 1);
}
public Flusher(int maxGather) public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
Throwable closed;
synchronized (this)
{ {
entries = new ArrayList<>(maxGather); closed = terminated;
buffers = new ArrayList<>((maxGather * 2) + 1); if (closed == null)
{
byte opCode = frame.getOpCode();
if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry);
else
queue.offerLast(entry);
}
} }
private Action batch() if (closed == null)
iterate();
else
notifyCallbackFailure(callback, closed);
}
@Override
protected Action process() throws Throwable
{
if (LOG.isDebugEnabled())
LOG.debug("Flushing {}", this);
int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (this)
{ {
if (aggregate == null) if (closed)
return Action.SUCCEEDED;
if (terminated != null)
throw terminated;
while (!queue.isEmpty() && entries.size() <= maxGather)
{ {
aggregate = bufferPool.acquire(bufferSize,true); FrameEntry entry = queue.poll();
if (LOG.isDebugEnabled()) currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
{
LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
}
}
// Do not allocate the iterator here. // Force flush if we need to.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
entry.generateHeaderBytes(aggregate);
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
{
BufferUtil.append(aggregate,payload);
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
}
succeeded();
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
public void onCompleteFailure(Throwable x)
{
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback,x);
entry.release();
}
entries.clear();
failure = x;
onFailure(x);
}
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
buffers.add(aggregate);
if (LOG.isDebugEnabled())
{
LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
}
}
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
// Skip the "synthetic" frame used for flushing.
if (entry.frame == FLUSH_FRAME) if (entry.frame == FLUSH_FRAME)
{ currentBatchMode = BatchMode.OFF;
continue;
}
buffers.add(entry.generateHeaderBytes());
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
{
buffers.add(payload);
}
}
if (LOG.isDebugEnabled()) int payloadLength = BufferUtil.length(entry.frame.getPayload());
{ int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
}
if (buffers.isEmpty()) // If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
currentBatchMode = BatchMode.OFF;
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
currentBatchMode = BatchMode.OFF;
entries.add(entry);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} processing {} entries: {}", this, entries.size(), entries);
if (entries.isEmpty())
{
if (batchMode != BatchMode.AUTO)
{ {
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
releaseAggregate(); releaseAggregate();
// We may have the FLUSH_FRAME to notify.
succeedEntries();
return Action.IDLE; return Action.IDLE;
} }
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
@Override
protected Action process() throws Exception
{
int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
{
while ((entries.size() <= maxGather) && !queue.isEmpty())
{
FrameEntry entry = queue.poll();
currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
// Force flush if we need to.
if (entry.frame == FLUSH_FRAME)
{
currentBatchMode = BatchMode.OFF;
}
int payloadLength = BufferUtil.length(entry.frame.getPayload());
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
// If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
{
currentBatchMode = BatchMode.OFF;
}
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
{
currentBatchMode = BatchMode.OFF;
}
entries.add(entry);
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ LOG.debug("{} auto flushing", this);
LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
}
if (entries.isEmpty()) return flush();
{
if (batchMode != BatchMode.AUTO)
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
releaseAggregate();
return Action.IDLE;
}
LOG.debug("{} auto flushing",FrameFlusher.this);
return flush();
}
batchMode = currentBatchMode;
return currentBatchMode == BatchMode.OFF?flush():batch();
} }
private void releaseAggregate() batchMode = currentBatchMode;
return currentBatchMode == BatchMode.OFF ? flush() : batch();
}
private Action batch()
{
if (aggregate == null)
{ {
if ((aggregate != null) && BufferUtil.isEmpty(aggregate)) aggregate = bufferPool.acquire(bufferSize, true);
{ if (LOG.isDebugEnabled())
bufferPool.release(aggregate); LOG.debug("{} acquired aggregate buffer {}", this, aggregate);
aggregate = null;
}
} }
@Override for (FrameEntry entry : entries)
public void succeeded()
{ {
entry.generateHeaderBytes(aggregate);
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
BufferUtil.append(aggregate, payload);
}
if (LOG.isDebugEnabled())
LOG.debug("{} aggregated {} frames: {}", this, entries.size(), entries);
// We just aggregated the entries, so we need to succeed their callbacks.
succeeded();
return Action.SCHEDULED;
}
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
buffers.add(aggregate);
if (LOG.isDebugEnabled())
LOG.debug("{} flushing aggregate {}", this, aggregate);
}
for (FrameEntry entry : entries)
{
// Skip the "synthetic" frame used for flushing.
if (entry.frame == FLUSH_FRAME)
continue;
buffers.add(entry.generateHeaderBytes());
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
buffers.add(payload);
}
if (LOG.isDebugEnabled())
LOG.debug("{} flushing {} frames: {}", this, entries.size(), entries);
if (buffers.isEmpty())
{
releaseAggregate();
// We may have the FLUSH_FRAME to notify.
succeedEntries(); succeedEntries();
super.succeeded(); return Action.IDLE;
} }
private void succeedEntries() endPoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
private int getQueueSize()
{
synchronized (this)
{ {
// Do not allocate the iterator here. return queue.size();
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
notifyCallbackSuccess(entry.callback);
entry.release();
}
entries.clear();
} }
} }
@Override
public void succeeded()
{
succeedEntries();
super.succeeded();
}
private void succeedEntries()
{
for (FrameEntry entry : entries)
{
notifyCallbackSuccess(entry.callback);
entry.release();
if (entry.frame.getOpCode() == OpCode.CLOSE)
{
terminate(new ClosedChannelException(), true);
endPoint.shutdownOutput();
}
}
entries.clear();
}
@Override
public void onCompleteFailure(Throwable failure)
{
releaseAggregate();
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
terminated = failure;
entries.addAll(queue);
queue.clear();
}
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback, failure);
entry.release();
}
entries.clear();
}
private void releaseAggregate()
{
if (BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
}
}
void terminate(Throwable cause, boolean close)
{
Throwable reason;
synchronized (this)
{
closed = close;
reason = terminated;
if (reason == null)
terminated = cause;
}
if (LOG.isDebugEnabled())
LOG.debug("{} {}", reason == null ? "Terminating" : "Terminated", this);
if (reason == null && !close)
iterate();
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
{
callback.writeSuccess();
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
{
callback.writeFailed(failure);
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
@Override
public String toString()
{
return String.format("%s@%x[queueSize=%d,aggregateSize=%d,terminated=%s]",
getClass().getSimpleName(),
hashCode(),
getQueueSize(),
aggregate == null ? 0 : aggregate.position(),
terminated);
}
private class FrameEntry private class FrameEntry
{ {
private final Frame frame; private final Frame frame;
@ -266,7 +362,7 @@ public class FrameFlusher
private void generateHeaderBytes(ByteBuffer buffer) private void generateHeaderBytes(ByteBuffer buffer)
{ {
generator.generateHeaderBytes(frame,buffer); generator.generateHeaderBytes(frame, buffer);
} }
private void release() private void release()
@ -281,162 +377,7 @@ public class FrameFlusher
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure); return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, terminated);
} }
} }
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private final ByteBufferPool bufferPool;
private final EndPoint endpoint;
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final Deque<FrameEntry> queue = new ArrayDeque<>();
private final Flusher flusher;
private boolean closed = false;
private volatile Throwable failure;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
{
this.bufferPool = bufferPool;
this.endpoint = endpoint;
this.bufferSize = bufferSize;
this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
this.flusher = new Flusher(maxGather);
}
public void close()
{
List<FrameEntry> entries;
EOFException eof;
synchronized (lock)
{
if (closed)
{
// Already closed
return;
}
closed = true;
LOG.debug("{} closing {}", this);
eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
// Fail also queued entries.
entries = new ArrayList<>();
entries.addAll(queue);
queue.clear();
}
// Notify outside sync block.
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback, eof);
}
}
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{
Throwable tosser = null;
synchronized(lock)
{
if (closed)
{
tosser = new EOFException("Connection has been closed locally");
}
else if (failure != null)
{
tosser = failure;
}
else
{
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (OpCode.isControlFrame(frame.getOpCode()))
{
if (frame.getOpCode() == OpCode.CLOSE)
{
// CLOSE after last frame
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed = true;
queue.offer(entry);
}
else
{
// PING and PONG are prepended
queue.offerFirst(entry);
}
}
else
{
// Data Frame
queue.offer(entry);
}
}
}
if (tosser != null)
{
// Notify outside of lock
notifyCallbackFailure(callback, tosser);
}
else
{
// Iterate outside of lock
flusher.iterate();
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
{
callback.writeFailed(failure);
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying failure of callback " + callback,x);
}
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
{
callback.writeSuccess();
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying success of callback " + callback,x);
}
}
protected void onFailure(Throwable x)
{
LOG.warn(x);
}
@Override
public String toString()
{
ByteBuffer aggregate = flusher.aggregate;
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
failure);
}
} }