Issue #272 - Addressing some concerns with multithreading in FrameFlusher

This commit is contained in:
Joakim Erdfelt 2017-10-03 15:20:16 -07:00
parent e2d7057988
commit bb5195192a
1 changed files with 62 additions and 49 deletions

View File

@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -296,7 +295,7 @@ public class FrameFlusher
private final Object lock = new Object(); private final Object lock = new Object();
private final Deque<FrameEntry> queue = new ArrayDeque<>(); private final Deque<FrameEntry> queue = new ArrayDeque<>();
private final Flusher flusher; private final Flusher flusher;
private final AtomicBoolean closed = new AtomicBoolean(); private boolean closed = false;
private volatile Throwable failure; private volatile Throwable failure;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather) public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
@ -311,75 +310,89 @@ public class FrameFlusher
public void close() public void close()
{ {
if (closed.compareAndSet(false,true)) List<FrameEntry> entries;
EOFException eof;
synchronized (lock)
{ {
LOG.debug("{} closing {}",this); if (closed)
EOFException eof = new EOFException("Connection has been closed locally"); {
// Already closed
return;
}
closed = true;
LOG.debug("{} closing {}", this);
eof = new EOFException("Connection has been closed locally");
flusher.failed(eof); flusher.failed(eof);
// Fail also queued entries. // Fail also queued entries.
List<FrameEntry> entries = new ArrayList<>(); entries = new ArrayList<>();
synchronized (lock) entries.addAll(queue);
{ queue.clear();
entries.addAll(queue); }
queue.clear();
} // Notify outside sync block.
// Notify outside sync block. for (FrameEntry entry : entries)
for (FrameEntry entry : entries) {
{ notifyCallbackFailure(entry.callback, eof);
notifyCallbackFailure(entry.callback,eof);
}
} }
} }
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode) public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{ {
if (closed.get()) Throwable tosser = null;
{
notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
return;
}
if (flusher.isFailed())
{
notifyCallbackFailure(callback,failure);
return;
}
FrameEntry entry = new FrameEntry(frame,callback,batchMode); synchronized(lock)
synchronized (lock)
{ {
switch (frame.getOpCode()) if (closed)
{ {
case OpCode.PING: tosser = new EOFException("Connection has been closed locally");
}
else if (failure != null)
{
tosser = failure;
}
else
{
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (OpCode.isControlFrame(frame.getOpCode()))
{ {
// Prepend PINGs so they are processed first. if (frame.getOpCode() == OpCode.CLOSE)
queue.offerFirst(entry); {
break; // CLOSE after last frame
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed = true;
queue.offer(entry);
}
else
{
// PING and PONG are prepended
queue.offerFirst(entry);
}
} }
case OpCode.CLOSE: else
{ {
// There may be a chance that other frames are // Data Frame
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.offer(entry); queue.offer(entry);
break;
}
default:
{
queue.offer(entry);
break;
} }
} }
} }
if (LOG.isDebugEnabled()) if (tosser != null)
{ {
LOG.debug("{} queued {}",this,entry); // Notify outside of lock
notifyCallbackFailure(callback, tosser);
}
else
{
// Iterate outside of lock
flusher.iterate();
} }
flusher.iterate();
} }
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure) protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)