tidy up close

This commit is contained in:
Greg Wilkins 2017-04-21 17:14:26 +10:00 committed by Joakim Erdfelt
parent dddf066e90
commit 7148db67dc
1 changed files with 71 additions and 60 deletions

View File

@ -19,19 +19,20 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -158,10 +159,11 @@ public class FrameFlusher
@Override
protected Action process() throws Exception
{
int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
try (Locker.Lock l = lock.lock())
{
int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
while ((entries.size() <= maxGather) && !queue.isEmpty())
{
FrameEntry entry = queue.poll();
@ -292,10 +294,10 @@ public class FrameFlusher
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final Locker lock = new Locker();
private final Deque<FrameEntry> queue = new ArrayDeque<>();
private final Flusher flusher;
private final AtomicBoolean closed = new AtomicBoolean();
private boolean closed = false;
private volatile Throwable failure;
public FrameFlusher(Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
@ -309,20 +311,26 @@ public class FrameFlusher
public void close()
{
if (closed.compareAndSet(false,true))
List<FrameEntry> entries = null;
try(Locker.Lock l = lock.lock())
{
LOG.debug("{} closing {}",this);
EOFException eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
// Fail also queued entries.
List<FrameEntry> entries = new ArrayList<>();
synchronized (lock)
if (!closed)
{
closed = true;
LOG.debug("{} closing {}",this);
entries = new ArrayList<>();
entries.addAll(queue);
queue.clear();
}
// Notify outside sync block.
}
// Notify outside sync block.
if (entries != null)
{
EOFException eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback,eof);
@ -332,60 +340,63 @@ public class FrameFlusher
public void enqueue(Frame frame, FrameCallback callback, BatchMode batchMode)
{
if (closed.get())
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} discarding/closed {}",this,frame);
}
notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
return;
}
if (flusher.isFailed())
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} discarding/failed {}",this,frame);
}
notifyCallbackFailure(callback,failure);
return;
}
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled())
Throwable failed = null;
try (Locker.Lock l = lock.lock())
{
LOG.debug("{} queued {}",this,entry);
}
synchronized (lock)
{
switch (frame.getOpCode())
if (closed)
{
case OpCode.PING:
failed = new EOFException("Connection has been closed locally");
}
else if (flusher.isFailed())
{
failed = failure==null?new IOException():failure;
}
else
{
switch (frame.getOpCode())
{
// Prepend PINGs so they are processed first.
queue.offerFirst(entry);
break;
}
case OpCode.CLOSE:
{
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.offer(entry);
break;
}
default:
{
queue.offer(entry);
break;
case OpCode.PING:
{
// Prepend PINGs so they are processed first.
queue.offerFirst(entry);
break;
}
case OpCode.CLOSE:
{
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed = true;
queue.offer(entry);
break;
}
default:
{
queue.offer(entry);
break;
}
}
}
}
flusher.iterate();
if (failed!=null)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} failed {}",this,failed);
}
notifyCallbackFailure(callback,failed);
}
else
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} queued {}",this,entry);
}
flusher.iterate();
}
}
protected void notifyCallbackFailure(FrameCallback callback, Throwable failure)