Replacing FrameBytes tree with WriteBytesProvider class
This commit is contained in:
parent
ecb472f30b
commit
e308f843db
|
@ -104,7 +104,7 @@ public abstract class EventDriver implements IncomingFrames
|
|||
onClose(close);
|
||||
|
||||
// process handshake
|
||||
if (session.getConnection().getIOState().onCloseHandshake(true,close))
|
||||
if (session.getConnection().getIOState().onCloseHandshake(true))
|
||||
{
|
||||
// handshake resolved, disconnect.
|
||||
session.getConnection().disconnect();
|
||||
|
|
|
@ -26,13 +26,11 @@ import java.util.List;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.ForkInvoker;
|
||||
|
@ -52,7 +50,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
|
|||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
|
||||
|
@ -61,139 +58,19 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
|
|||
*/
|
||||
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection
|
||||
{
|
||||
private class ControlFrameBytes extends FrameBytes
|
||||
private class FlushCallback implements Callback
|
||||
{
|
||||
private ByteBuffer buffer;
|
||||
private ByteBuffer origPayload;
|
||||
|
||||
public ControlFrameBytes(Frame frame, Callback childCallback)
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
super(frame,childCallback);
|
||||
LOG.warn("Write flush failure",x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeWrite()
|
||||
public void succeeded()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("completeWrite() - frame: {}",frame);
|
||||
}
|
||||
|
||||
if (buffer != null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Releasing Buffer: {}",BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
|
||||
getBufferPool().release(buffer);
|
||||
buffer = null;
|
||||
}
|
||||
|
||||
queue.remove(this);
|
||||
super.completeFrame();
|
||||
|
||||
if (frame.getType().getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
CloseInfo close = new CloseInfo(origPayload,false);
|
||||
// TODO: change into DisconnectWebSocketCallback
|
||||
onWriteWebSocketClose(close);
|
||||
}
|
||||
|
||||
getBufferPool().release(origPayload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(STARTED);
|
||||
if (buffer == null)
|
||||
{
|
||||
if (frame.hasPayload())
|
||||
{
|
||||
int len = frame.getPayload().remaining();
|
||||
origPayload = getBufferPool().acquire(len,false);
|
||||
BufferUtil.put(frame.getPayload(),origPayload);
|
||||
}
|
||||
buffer = getGenerator().generate(frame);
|
||||
}
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
private class DataFrameBytes extends FrameBytes
|
||||
{
|
||||
private ByteBuffer buffer;
|
||||
|
||||
public DataFrameBytes(Frame frame, Callback childCallback)
|
||||
{
|
||||
super(frame,childCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeWrite()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("completeWrite() - frame.remaining() = {}",frame.remaining());
|
||||
}
|
||||
|
||||
if (buffer != null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Releasing Buffer: {}",BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
|
||||
getBufferPool().release(buffer);
|
||||
buffer = null;
|
||||
}
|
||||
|
||||
if (frame.remaining() > 0)
|
||||
{
|
||||
LOG.debug("More to send");
|
||||
// We have written a partial frame per windowing size.
|
||||
// We need to keep the correct ordering of frames, to avoid that another
|
||||
// Data frame for the same stream is written before this one is finished.
|
||||
super.completeWrite();
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Send complete");
|
||||
synchronized (queue)
|
||||
{
|
||||
queue.remove(this);
|
||||
}
|
||||
// TODO: Notify the rest of the callback chain (extension, close/disconnect, and user callbacks)
|
||||
completeFrame();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
try
|
||||
{
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(STARTED);
|
||||
int windowSize = getInputBufferSize();
|
||||
buffer = getGenerator().generate(windowSize,frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
failFrame(x);
|
||||
return null;
|
||||
}
|
||||
// Lets process the next set of bytes...
|
||||
AbstractWebSocketConnection.this.complete(writeBytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,7 +84,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public void call(Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
flush();
|
||||
}
|
||||
|
||||
|
@ -219,7 +95,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
callback.succeeded();
|
||||
flush();
|
||||
}
|
||||
});
|
||||
|
@ -232,154 +107,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
public abstract class FrameBytes implements Callback
|
||||
private class OnCloseCallback implements WriteCallback
|
||||
{
|
||||
// no bytes have yet been flushed
|
||||
public int UNSTARTED = 0;
|
||||
// some bytes have been provided for being flushed
|
||||
public int STARTED = 1;
|
||||
// all bytes have been flushed
|
||||
public int FINISHED = 2;
|
||||
// is in failure state
|
||||
public int FAILED = 3;
|
||||
|
||||
protected final Logger LOG;
|
||||
protected final Frame frame;
|
||||
protected final Callback childCallback;
|
||||
protected final AtomicInteger state = new AtomicInteger(UNSTARTED);
|
||||
|
||||
public FrameBytes(Frame frame, Callback childCallback)
|
||||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
this.LOG = Log.getLogger(this.getClass());
|
||||
this.frame = frame;
|
||||
this.childCallback = childCallback;
|
||||
}
|
||||
|
||||
public void completeFrame()
|
||||
{
|
||||
LOG.debug("completeFrame() {}",this);
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(FINISHED);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Completed Write of {} ({} frame(s) in queue)",this,queue.size());
|
||||
}
|
||||
flushing = false;
|
||||
}
|
||||
AbstractWebSocketConnection.this.complete(childCallback);
|
||||
}
|
||||
|
||||
public void completeWrite()
|
||||
{
|
||||
// handle reflush.
|
||||
if (isUnfinished())
|
||||
{
|
||||
AbstractWebSocketConnection.this.complete(this);
|
||||
}
|
||||
disconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
public void writeSuccess()
|
||||
{
|
||||
if (this == obj)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (obj == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
FrameBytes other = (FrameBytes)obj;
|
||||
if (frame == null)
|
||||
{
|
||||
if (other.frame != null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else if (!frame.equals(other.frame))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for EndPoint.write failure
|
||||
*/
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
// Log failure
|
||||
if (x instanceof EofException)
|
||||
{
|
||||
// Abbreviate the EofException
|
||||
LOG.warn("failed() - " + EofException.class);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.warn("failed()",x);
|
||||
}
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(FAILED);
|
||||
flushing = false;
|
||||
queue.fail(x);
|
||||
}
|
||||
failFrame(x);
|
||||
}
|
||||
|
||||
public void failFrame(Throwable t)
|
||||
{
|
||||
failed(t);
|
||||
flush();
|
||||
}
|
||||
|
||||
public abstract ByteBuffer getByteBuffer();
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = (prime * result) + ((frame == null)?0:frame.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the FrameBytes have been started, but not yet finished
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean isUnfinished()
|
||||
{
|
||||
return (state.get() == STARTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for EndPoint.write success
|
||||
*/
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
LOG.debug("succeeded() {}",this);
|
||||
completeWrite();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return frame.toString();
|
||||
onWriteWebSocketClose();
|
||||
}
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
|
||||
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");
|
||||
|
||||
private final ForkInvoker<Callback> invoker = new FlushInvoker();
|
||||
private final ByteBufferPool bufferPool;
|
||||
|
@ -387,7 +130,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
private final Generator generator;
|
||||
private final Parser parser;
|
||||
private final WebSocketPolicy policy;
|
||||
private final FrameQueue queue = new FrameQueue();
|
||||
private final WriteBytesProvider writeBytes;
|
||||
private final AtomicBoolean suspendToken;
|
||||
private WebSocketSession session;
|
||||
private List<ExtensionConfig> extensions;
|
||||
|
@ -407,6 +150,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
this.suspendToken = new AtomicBoolean(false);
|
||||
this.ioState = new IOState();
|
||||
this.ioState.setState(ConnectionState.CONNECTING);
|
||||
this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -423,14 +167,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
|
||||
public void complete(final Callback callback)
|
||||
{
|
||||
if (callback == null)
|
||||
LOG.debug("complete({})",callback);
|
||||
synchronized (writeBytes)
|
||||
{
|
||||
flushing = false;
|
||||
}
|
||||
|
||||
if (!ioState.isOpen() || (callback == null))
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ioState.isOpen())
|
||||
{
|
||||
invoker.invoke(callback);
|
||||
}
|
||||
|
||||
invoker.invoke(callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -465,9 +213,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
*/
|
||||
private void enqueClose(int statusCode, String reason)
|
||||
{
|
||||
synchronized (writeBytes)
|
||||
{
|
||||
// It is possible to get close events from many different sources.
|
||||
// Make sure we only sent 1 over the network.
|
||||
if (writeBytes.isClosed())
|
||||
{
|
||||
// already sent the close
|
||||
return;
|
||||
}
|
||||
}
|
||||
CloseInfo close = new CloseInfo(statusCode,reason);
|
||||
// TODO: create DisconnectCallback?
|
||||
outgoingFrame(close.asFrame(),null);
|
||||
outgoingFrame(close.asFrame(),new OnCloseCallback());
|
||||
}
|
||||
|
||||
private void execute(Runnable task)
|
||||
|
@ -484,39 +242,34 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
|
||||
public void flush()
|
||||
{
|
||||
FrameBytes frameBytes = null;
|
||||
ByteBuffer buffer = null;
|
||||
|
||||
synchronized (queue)
|
||||
synchronized (writeBytes)
|
||||
{
|
||||
if (queue.isFailed())
|
||||
if (writeBytes.isFailed())
|
||||
{
|
||||
LOG.debug(".flush() - queue is in failed state");
|
||||
return;
|
||||
}
|
||||
|
||||
if (flushing || queue.isEmpty())
|
||||
if (flushing)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug(".flush() - flushing={} - queue.size = {}",flushing,queue.size());
|
||||
LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
|
||||
}
|
||||
|
||||
frameBytes = queue.peek();
|
||||
|
||||
if (!isOpen())
|
||||
{
|
||||
// No longer have an open connection, drop them all.
|
||||
queue.fail(new WebSocketException("Connection closed"));
|
||||
writeBytes.failAll(new WebSocketException("Connection closed"));
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("Next FrameBytes: {}",frameBytes);
|
||||
|
||||
buffer = frameBytes.getByteBuffer();
|
||||
buffer = writeBytes.getByteBuffer();
|
||||
|
||||
if (buffer == null)
|
||||
{
|
||||
|
@ -527,11 +280,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
|
||||
LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes);
|
||||
}
|
||||
}
|
||||
|
||||
write(buffer,frameBytes);
|
||||
write(buffer);
|
||||
}
|
||||
|
||||
public ByteBufferPool getBufferPool()
|
||||
|
@ -653,9 +406,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
return true;
|
||||
}
|
||||
|
||||
public void onWriteWebSocketClose(CloseInfo close)
|
||||
public void onWriteWebSocketClose()
|
||||
{
|
||||
if (ioState.onCloseHandshake(false,close))
|
||||
if (ioState.onCloseHandshake(false))
|
||||
{
|
||||
disconnect();
|
||||
}
|
||||
|
@ -669,34 +422,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("outgoingFrame({}, callback)",frame);
|
||||
LOG.debug("outgoingFrame({}, {})",frame,callback);
|
||||
}
|
||||
|
||||
synchronized (queue)
|
||||
if (!isOpen())
|
||||
{
|
||||
FrameBytes bytes = null;
|
||||
Callback jettyCallback = WriteCallbackWrapper.wrap(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.getType().isControl())
|
||||
{
|
||||
bytes = new ControlFrameBytes(frame,jettyCallback);
|
||||
}
|
||||
else
|
||||
{
|
||||
bytes = new DataFrameBytes(frame,jettyCallback);
|
||||
}
|
||||
|
||||
if (isOpen())
|
||||
{
|
||||
if (frame.getType().getOpCode() == OpCode.PING)
|
||||
{
|
||||
queue.prepend(bytes);
|
||||
}
|
||||
else
|
||||
{
|
||||
queue.append(bytes);
|
||||
}
|
||||
}
|
||||
synchronized (writeBytes)
|
||||
{
|
||||
writeBytes.enque(frame,WriteCallbackWrapper.wrap(callback));
|
||||
}
|
||||
|
||||
flush();
|
||||
|
@ -784,29 +520,23 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
|
||||
}
|
||||
|
||||
private <C> void write(ByteBuffer buffer, FrameBytes frameBytes)
|
||||
private <C> void write(ByteBuffer buffer)
|
||||
{
|
||||
EndPoint endpoint = getEndPoint();
|
||||
|
||||
if (LOG_FRAMES.isDebugEnabled())
|
||||
{
|
||||
LOG_FRAMES.debug("{} Writing {} of {} ",policy.getBehavior(),BufferUtil.toDetailString(buffer),frameBytes);
|
||||
}
|
||||
|
||||
if (!isOpen())
|
||||
{
|
||||
// connection is closed, STOP WRITING, geez.
|
||||
frameBytes.failed(new WebSocketException("Connection closed"));
|
||||
writeBytes.failAll(new IOException("Connection closed"));
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
endpoint.write(frameBytes,buffer);
|
||||
endpoint.write(writeBytes,buffer);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
frameBytes.failed(t);
|
||||
writeBytes.failed(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,93 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.FrameBytes;
|
||||
|
||||
/**
|
||||
* Queue for outgoing frames.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class FrameQueue extends LinkedList<FrameBytes>
|
||||
{
|
||||
private Throwable failure;
|
||||
|
||||
public void append(FrameBytes bytes)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// no changes when failed
|
||||
bytes.failed(failure);
|
||||
return;
|
||||
}
|
||||
addLast(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void fail(Throwable t)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// already failed.
|
||||
return;
|
||||
}
|
||||
|
||||
failure = t;
|
||||
|
||||
for (FrameBytes fb : this)
|
||||
{
|
||||
fb.failed(failure);
|
||||
}
|
||||
|
||||
clear();
|
||||
}
|
||||
}
|
||||
|
||||
public Throwable getFailure()
|
||||
{
|
||||
return failure;
|
||||
}
|
||||
|
||||
public boolean isFailed()
|
||||
{
|
||||
return (failure != null);
|
||||
}
|
||||
|
||||
public void prepend(FrameBytes bytes)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// no changes when failed
|
||||
bytes.failed(failure);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: make sure that we don't go in front of started but not yet finished frames.
|
||||
addFirst(bytes);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.websocket.common.io;
|
|||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
|
||||
/**
|
||||
|
@ -28,15 +30,19 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
|
|||
*/
|
||||
public class FutureWriteCallback extends FutureCallback implements WriteCallback
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(FutureWriteCallback.class);
|
||||
|
||||
@Override
|
||||
public void writeFailed(Throwable cause)
|
||||
{
|
||||
LOG.debug(".writeFailed",cause);
|
||||
failed(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
LOG.debug(".writeSuccess");
|
||||
succeeded();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
|
||||
/**
|
||||
|
@ -86,6 +85,11 @@ public class IOState
|
|||
return (isInputClosed() && isOutputClosed());
|
||||
}
|
||||
|
||||
public boolean isCloseInitiated()
|
||||
{
|
||||
return remoteCloseInitiated.get() || localCloseInitiated.get();
|
||||
}
|
||||
|
||||
public boolean isInputClosed()
|
||||
{
|
||||
return inputClosed.get();
|
||||
|
@ -110,7 +114,7 @@ public class IOState
|
|||
* the close details.
|
||||
* @return true if connection should be disconnected now, or false if response to close should be issued.
|
||||
*/
|
||||
public boolean onCloseHandshake(boolean incoming, CloseInfo close)
|
||||
public boolean onCloseHandshake(boolean incoming)
|
||||
{
|
||||
boolean in = inputClosed.get();
|
||||
boolean out = outputClosed.get();
|
||||
|
@ -135,7 +139,7 @@ public class IOState
|
|||
}
|
||||
}
|
||||
|
||||
LOG.debug("onCloseHandshake({},{}), input={}, output={}",incoming,close,in,out);
|
||||
LOG.debug("onCloseHandshake({}), input={}, output={}",incoming,in,out);
|
||||
|
||||
if (in && out)
|
||||
{
|
||||
|
@ -144,10 +148,6 @@ public class IOState
|
|||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* if (close.isHarsh()) { LOG.debug("Close status code was harsh, disconnecting"); return true; }
|
||||
*/
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -161,14 +161,9 @@ public class IOState
|
|||
this.state = state;
|
||||
}
|
||||
|
||||
public boolean isCloseInitiated()
|
||||
public boolean wasCleanClose()
|
||||
{
|
||||
return remoteCloseInitiated.get() || localCloseInitiated.get();
|
||||
}
|
||||
|
||||
public boolean wasRemoteCloseInitiated()
|
||||
{
|
||||
return remoteCloseInitiated.get();
|
||||
return cleanClose.get();
|
||||
}
|
||||
|
||||
public boolean wasLocalCloseInitiated()
|
||||
|
@ -176,8 +171,8 @@ public class IOState
|
|||
return localCloseInitiated.get();
|
||||
}
|
||||
|
||||
public boolean wasCleanClose()
|
||||
public boolean wasRemoteCloseInitiated()
|
||||
{
|
||||
return cleanClose.get();
|
||||
return remoteCloseInitiated.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,11 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
|
@ -38,7 +40,6 @@ public class WriteBytesProvider implements Callback
|
|||
{
|
||||
private class FrameEntry
|
||||
{
|
||||
private final Logger LOG = Log.getLogger(FrameEntry.class);
|
||||
protected final Frame frame;
|
||||
protected final Callback callback;
|
||||
|
||||
|
@ -59,6 +60,8 @@ public class WriteBytesProvider implements Callback
|
|||
}
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
|
||||
|
||||
/** The websocket generator */
|
||||
private final Generator generator;
|
||||
/** Flush callback, for notifying when a flush should be performed */
|
||||
|
@ -73,6 +76,8 @@ public class WriteBytesProvider implements Callback
|
|||
private Throwable failure;
|
||||
/** The last requested buffer */
|
||||
private ByteBuffer buffer;
|
||||
/** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
|
||||
private AtomicBoolean closed;
|
||||
|
||||
/**
|
||||
* Create a WriteBytesProvider with specified Generator and "flush" Callback.
|
||||
|
@ -89,28 +94,48 @@ public class WriteBytesProvider implements Callback
|
|||
this.generator = Objects.requireNonNull(generator);
|
||||
this.flushCallback = Objects.requireNonNull(flushCallback);
|
||||
this.queue = new LinkedList<>();
|
||||
this.closed = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a Frame & Callback to the pending queue.
|
||||
*
|
||||
* @param frame
|
||||
* the frame to add
|
||||
* @param callback
|
||||
* the optional callback for the frame write (can be null)
|
||||
*/
|
||||
public void append(Frame frame, Callback callback)
|
||||
public void enque(Frame frame, Callback callback)
|
||||
{
|
||||
Objects.requireNonNull(frame);
|
||||
LOG.debug("enque({}, {})",frame,callback);
|
||||
synchronized (this)
|
||||
{
|
||||
if (closed.get())
|
||||
{
|
||||
// Closed for more frames.
|
||||
LOG.debug("Write is closed: {}",frame,callback);
|
||||
if (callback != null)
|
||||
{
|
||||
callback.failed(new IOException("Write is closed"));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (isFailed())
|
||||
{
|
||||
// no changes when failed
|
||||
notifyFailure(callback);
|
||||
return;
|
||||
}
|
||||
queue.addLast(new FrameEntry(frame,callback));
|
||||
|
||||
FrameEntry entry = new FrameEntry(frame,callback);
|
||||
|
||||
switch (frame.getType())
|
||||
{
|
||||
case PING:
|
||||
queue.addFirst(entry);
|
||||
break;
|
||||
case CLOSE:
|
||||
closed.set(true);
|
||||
// drop the rest of the queue?
|
||||
queue.addLast(entry);
|
||||
break;
|
||||
default:
|
||||
queue.addLast(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,6 +216,19 @@ public class WriteBytesProvider implements Callback
|
|||
return failure;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to test for the final frame possible to be enqueued, the CLOSE frame.
|
||||
*
|
||||
* @return true if close frame has been enqueued already.
|
||||
*/
|
||||
public boolean isClosed()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return closed.get();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isFailed()
|
||||
{
|
||||
return (failure != null);
|
||||
|
@ -211,30 +249,6 @@ public class WriteBytesProvider implements Callback
|
|||
callback.failed(failure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepend a Frame & Callback to the pending queue.
|
||||
*
|
||||
* @param frame
|
||||
* the frame to add
|
||||
* @param callback
|
||||
* the optional callback for the frame write (can be null)
|
||||
*/
|
||||
public void prepend(Frame frame, Callback callback)
|
||||
{
|
||||
Objects.requireNonNull(frame);
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// no changes when failed
|
||||
notifyFailure(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
queue.addFirst(new FrameEntry(frame,callback));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the buffer size used for generating ByteBuffers from the frames.
|
||||
* <p>
|
||||
|
@ -267,6 +281,13 @@ public class WriteBytesProvider implements Callback
|
|||
if (active.frame.remaining() <= 0)
|
||||
{
|
||||
// All done with active FrameEntry
|
||||
if (active.callback != null)
|
||||
{
|
||||
// notify of success
|
||||
active.callback.succeeded();
|
||||
}
|
||||
|
||||
// null it out
|
||||
active = null;
|
||||
}
|
||||
|
||||
|
@ -274,4 +295,24 @@ public class WriteBytesProvider implements Callback
|
|||
flushCallback.succeeded();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
StringBuilder b = new StringBuilder();
|
||||
b.append("WriteBytesProvider[");
|
||||
b.append("flushCallback=").append(flushCallback);
|
||||
if (isFailed())
|
||||
{
|
||||
b.append(",FAILURE=").append(failure.getClass().getName());
|
||||
b.append(",").append(failure.getMessage());
|
||||
}
|
||||
else
|
||||
{
|
||||
b.append(",active=").append(active);
|
||||
b.append(",queue.size=").append(queue.size());
|
||||
}
|
||||
b.append(']');
|
||||
return b.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.eclipse.jetty.io.EndPoint;
|
|||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
|
||||
|
||||
public class WebSocketServerConnection extends AbstractWebSocketConnection
|
||||
|
@ -73,7 +72,7 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onWriteWebSocketClose(CloseInfo close)
|
||||
public void onWriteWebSocketClose()
|
||||
{
|
||||
// as server, always disconnect if writing close
|
||||
disconnect();
|
||||
|
|
|
@ -263,7 +263,6 @@ public class TestABCase2 extends AbstractABCase
|
|||
@Test
|
||||
public void testCase2_6() throws Exception
|
||||
{
|
||||
System.err.println("==================================================================================================");
|
||||
byte payload[] = new byte[125];
|
||||
Arrays.fill(payload,(byte)'6');
|
||||
|
||||
|
|
|
@ -180,7 +180,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
|
|||
{
|
||||
CloseInfo close = new CloseInfo(statusCode,message);
|
||||
|
||||
if (ioState.onCloseHandshake(false,close))
|
||||
if (ioState.onCloseHandshake(false))
|
||||
{
|
||||
this.disconnect();
|
||||
}
|
||||
|
@ -364,7 +364,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
|
|||
if (frame.getType() == Frame.Type.CLOSE)
|
||||
{
|
||||
CloseInfo close = new CloseInfo(frame);
|
||||
if (ioState.onCloseHandshake(true,close))
|
||||
if (ioState.onCloseHandshake(true))
|
||||
{
|
||||
this.disconnect();
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
# org.eclipse.jetty.LEVEL=WARN
|
||||
|
||||
org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.LEVEL=INFO
|
||||
# org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.LEVEL=WARN
|
||||
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
|
||||
|
|
|
@ -126,15 +126,8 @@ public class WebSocketChatServlet extends WebSocketServlet implements WebSocketC
|
|||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Async write the message back.
|
||||
member.connection.write(data);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
getServletContext().log("write failed",e);
|
||||
}
|
||||
// Async write the message back.
|
||||
member.connection.write(data);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue