Refactoring websocket.io.RawConnection to websocket.api.BaseConnection.

+ Introducing BaseConnection.SuspendToken and suspend/resume logic for
  working with buffer suspended read concerns.
This commit is contained in:
Joakim Erdfelt 2012-08-08 12:30:12 -07:00
parent 1a62b2a780
commit 0abb7511ff
12 changed files with 279 additions and 135 deletions

View File

@ -0,0 +1,73 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.net.InetSocketAddress;
public interface BaseConnection
{
/**
* Connection suspend token
*/
public static interface SuspendToken
{
/**
* Resume a previously suspended connection.
*/
void resume();
}
/**
* Terminate connection, {@link StatusCode#NORMAL}, without a reason.
* <p>
* Basic usage: results in an non-blocking async write, then connection close.
*
* @throws IOException
* if unable to send the close frame, or close the connection successfully.
* @see StatusCode
* @see #close(int, String)
*/
void close() throws IOException;
/**
* Terminate connection, with status code.
* <p>
* Advanced usage: results in an non-blocking async write, then connection close.
*
* @param statusCode
* the status code
* @param reason
* the (optional) reason. (can be null for no reason)
* @throws IOException
* if unable to send the close frame, or close the connection successfully.
* @see StatusCode
*/
void close(int statusCode, String reason) throws IOException;
/**
* Get the remote Address in use for this connection.
*
* @return the remote address if available. (situations like mux extension and proxying makes this information unreliable)
*/
InetSocketAddress getRemoteAddress();
/**
* Simple test to see if connection is open (and not closed)
*
* @return true if connection still open
*/
boolean isOpen();
/**
* Tests if the connection is actively reading.
*
* @return true if connection is actively attempting to read.
*/
boolean isReading();
/**
* Suspend a the incoming read events on the connection.
*
* @return
*/
SuspendToken suspend();
}

View File

@ -16,7 +16,6 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
@ -24,35 +23,8 @@ import org.eclipse.jetty.util.Callback;
/**
* Connection interface for WebSocket protocol <a href="https://tools.ietf.org/html/rfc6455">RFC-6455</a>.
*/
public interface WebSocketConnection
public interface WebSocketConnection extends BaseConnection
{
/**
* Terminate connection, {@link StatusCode#NORMAL}, without a reason.
* <p>
* Basic usage: results in an non-blocking async write, then connection close.
*
* @throws IOException
* if unable to send the close frame, or close the connection successfully.
* @see StatusCode
* @see #close(int, String)
*/
void close() throws IOException;
/**
* Terminate connection, with status code.
* <p>
* Advanced usage: results in an non-blocking async write, then connection close.
*
* @param statusCode
* the status code
* @param reason
* the (optional) reason. (can be null for no reason)
* @throws IOException
* if unable to send the close frame, or close the connection successfully.
* @see StatusCode
*/
void close(int statusCode, String reason) throws IOException;
/**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection.
*
@ -60,13 +32,6 @@ public interface WebSocketConnection
*/
WebSocketPolicy getPolicy();
/**
* Get the remote Address in use for this connection.
*
* @return the remote address if available. (situations like mux extension and proxying makes this information unreliable)
*/
InetSocketAddress getRemoteAddress();
/**
* Get the SubProtocol in use for this connection.
*
@ -74,13 +39,6 @@ public interface WebSocketConnection
*/
String getSubProtocol();
/**
* Simple test to see if connection is open (and not closed)
*
* @return true if connection still open
*/
boolean isOpen();
/**
* Send a single ping messages.
* <p>

View File

@ -229,12 +229,7 @@ public class WebSocketEventDriver implements IncomingFrames
{
if (events.onText.isStreaming())
{
// Allocate directly, not via ByteBufferPool, as this buffer
// is ultimately controlled by the end user, and we can't know
// when they are done using the stream in order to release any
// buffer allocated from the ByteBufferPool.
ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize());
this.activeMessage = new MessageReader(buf);
activeMessage = new MessageReader(websocket,events.onBinary,session,policy);
}
else
{

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -33,6 +34,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
@ -47,7 +49,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements RawConnection, OutgoingFrames
public abstract class AbstractWebSocketConnection extends AbstractConnection implements BaseConnection, BaseConnection.SuspendToken, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");
@ -58,9 +60,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final Parser parser;
private final WebSocketPolicy policy;
private final FrameQueue queue;
private final AtomicBoolean suspendToken;
private WebSocketSession session;
private List<ExtensionConfig> extensions;
private boolean flushing;
private boolean isFilling;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
@ -72,6 +76,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
this.queue = new FrameQueue();
this.suspendToken = new AtomicBoolean(false);
}
@Override
@ -98,7 +103,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void disconnect(boolean onlyOutput)
{
EndPoint endPoint = getEndPoint();
@ -146,7 +150,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
}
}
write(buffer,this,frameBytes);
write(buffer,frameBytes);
}
public ByteBufferPool getBufferPool()
@ -154,16 +158,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return bufferPool;
}
public Executor getExecutor()
{
return getExecutor();
}
/**
* Get the list of extensions in use.
* <p>
* This list is negotiated during the WebSocket Upgrade Request/Response handshake.
*
*
* @return the list of negotiated extensions in use.
*/
public List<ExtensionConfig> getExtensions()
@ -213,6 +212,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return getEndPoint().isOpen();
}
@Override
public boolean isReading()
{
return isFilling;
}
@Override
public void onFillable()
{
@ -221,16 +226,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
boolean readMore = false;
try
{
isFilling = true;
readMore = (read(buffer) != -1);
}
finally
{
bufferPool.release(buffer);
}
if (readMore)
if (readMore && (suspendToken.get() == false))
{
fillInterested();
}
else
{
isFilling = false;
}
}
@Override
@ -296,7 +307,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
else if (filled < 0)
{
LOG.debug("read - EOF Reached");
// disconnect(false); // FIXME Simone says this is bad
return -1;
}
else
@ -323,6 +333,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void resume()
{
if(suspendToken.getAndSet(false)) {
fillInterested();
}
}
private <C> void scheduleTimeout(FrameBytes<C> bytes)
{
if (policy.getIdleTimeout() > 0)
@ -335,7 +353,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Get the list of extensions in use.
* <p>
* This list is negotiated during the WebSocket Upgrade Request/Response handshake.
*
*
* @param extensions
* the list of negotiated extensions in use.
*/
@ -349,9 +367,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.session = session;
}
@Override
public SuspendToken suspend()
{
suspendToken.set(true);
return this;
}
/**
* For terminating connections forcefully.
*
*
* @param statusCode
* the WebSocket status code.
* @param reason
@ -373,7 +398,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
}
private <C> void write(ByteBuffer buffer, AbstractWebSocketConnection webSocketConnection, FrameBytes<C> frameBytes)
private <C> void write(ByteBuffer buffer, FrameBytes<C> frameBytes)
{
EndPoint endpoint = getEndPoint();

View File

@ -1,37 +0,0 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Interface for working with connections in a raw way.
* <p>
* This is abstracted out to allow for common access to connection internals regardless of physical vs virtual connections.
*/
public interface RawConnection extends OutgoingFrames
{
void close() throws IOException;
void close(int statusCode, String reason) throws IOException;
void disconnect(boolean onlyOutput);
InetSocketAddress getRemoteAddress();
boolean isOpen();
}

View File

@ -8,6 +8,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -18,17 +19,22 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class WebSocketSession implements WebSocketConnection, IncomingFrames, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private final RawConnection connection;
/**
* The reference to the base connection.
* <p>
* This will be the {@link AbstractWebSocketConnection} on normal websocket use, and be a MuxConnection when MUX is in the picture.
*/
private final BaseConnection baseConnection;
private final WebSocketPolicy policy;
private final String subprotocol;
private final WebSocketEventDriver websocket;
private OutgoingFrames outgoing;
public WebSocketSession(WebSocketEventDriver websocket, RawConnection connection, WebSocketPolicy policy, String subprotocol)
public WebSocketSession(WebSocketEventDriver websocket, BaseConnection connection, WebSocketPolicy policy, String subprotocol)
{
super();
this.websocket = websocket;
this.connection = connection;
this.baseConnection = connection;
this.policy = policy;
this.subprotocol = subprotocol;
}
@ -36,13 +42,13 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public void close() throws IOException
{
connection.close();
baseConnection.close();
}
@Override
public void close(int statusCode, String reason) throws IOException
{
connection.close(statusCode,reason);
baseConnection.close(statusCode,reason);
}
public IncomingFrames getIncoming()
@ -64,7 +70,7 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public InetSocketAddress getRemoteAddress()
{
return connection.getRemoteAddress();
return baseConnection.getRemoteAddress();
}
@Override
@ -90,7 +96,13 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public boolean isOpen()
{
return connection.isOpen();
return baseConnection.isOpen();
}
@Override
public boolean isReading()
{
return baseConnection.isReading();
}
public void onConnect()
@ -117,6 +129,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
// Delegate the application called ping to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload);
frame.setFin(true);
output(context,callback,frame);
@ -127,14 +141,20 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
this.outgoing = outgoing;
}
@Override
public SuspendToken suspend()
{
return baseConnection.suspend();
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("WebSocketSession[websocket=");
builder.append(websocket);
builder.append(",connection=");
builder.append(connection);
builder.append(",baseConnection=");
builder.append(baseConnection);
builder.append(",subprotocol=");
builder.append(subprotocol);
builder.append(",outgoing=");
@ -153,6 +173,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
}
// Delegate the application called write to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len);
frame.setFin(true);
output(context,callback,frame);
@ -168,6 +190,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer));
}
// Delegate the application called write to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
frame.setFin(true);
output(context,callback,frame);
@ -183,6 +207,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
LOG.debug("write(context,{},message.length:{})",callback,message.length());
}
// Delegate the application called ping to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.text(message);
frame.setFin(true);
output(context,callback,frame);

View File

@ -3,9 +3,23 @@ package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Appender for messages (used for multiple fragments with continuations, and also to allow for streaming APIs)
*/
public interface MessageAppender
{
abstract void appendMessage(ByteBuffer byteBuffer) throws IOException;
/**
* Append the payload to the message.
*
* @param payload
* the payload to append.
* @throws IOException
* if unable to append the payload
*/
abstract void appendMessage(ByteBuffer payload) throws IOException;
abstract void messageComplete() throws IOException;
/**
* Notification that message is to be considered complete.
*/
abstract void messageComplete();
}

View File

@ -30,6 +30,10 @@ import org.eclipse.jetty.websocket.io.WebSocketSession;
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
/**
* Threshold (of bytes) to perform compaction at
*/
private static final int COMPACT_THRESHOLD = 5;
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
@ -38,7 +42,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
private final ByteBuffer buf;
private int size;
private boolean finished;
private boolean needsNotification = true;
private boolean needsNotification;
private int readPosition;
public MessageInputStream(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy)
{
@ -49,7 +54,10 @@ public class MessageInputStream extends InputStream implements MessageAppender
this.policy = policy;
this.buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(this.buf);
size = 0;
readPosition = this.buf.position();
finished = false;
needsNotification = true;
}
@Override
@ -72,6 +80,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
synchronized (buf)
{
// TODO: grow buffer till max binary message size?
// TODO: compact this buffer to fit incoming buffer?
// TODO: tell connection to suspend if buffer too full?
BufferUtil.put(payload,buf);
}
@ -85,12 +95,13 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override
public void close() throws IOException
{
finished = true;
super.close();
this.bufferPool.release(this.buf);
}
@Override
public void messageComplete() throws IOException
public void messageComplete()
{
finished = true;
}
@ -100,13 +111,14 @@ public class MessageInputStream extends InputStream implements MessageAppender
{
synchronized (buf)
{
// FIXME: HACKITY HACK HACK HACK
// Should really use its own tracking of position, to avoid flipping the
// buffer between read/write
byte b = buf.get();
if (buf.limit() <= (buf.capacity() - 5))
byte b = buf.get(readPosition);
readPosition++;
if (readPosition <= (buf.limit() - COMPACT_THRESHOLD))
{
int curPos = buf.position();
buf.compact();
int offsetPos = buf.position() - curPos;
readPosition += offsetPos;
}
return b;
}

View File

@ -19,7 +19,10 @@ import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethod;
import org.eclipse.jetty.websocket.io.WebSocketSession;
/**
* Support class for reading text message data as an Reader.
@ -28,32 +31,66 @@ import org.eclipse.jetty.util.BufferUtil;
*/
public class MessageReader extends Reader implements MessageAppender
{
private ByteBuffer buffer;
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final WebSocketPolicy policy;
private final Utf8StringBuilder utf;
private int size;
private boolean finished;
private boolean needsNotification;
public MessageReader(ByteBuffer buf)
public MessageReader(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
{
BufferUtil.clearToFill(buf);
this.buffer = buf;
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.policy = policy;
this.utf = new Utf8StringBuilder();
size = 0;
finished = false;
needsNotification = true;
}
@Override
public void appendMessage(ByteBuffer byteBuffer) throws IOException
public void appendMessage(ByteBuffer payload) throws IOException
{
// TODO Auto-generated method stub
if (finished)
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
// empty payload is valid
return;
}
policy.assertValidTextMessageSize(size + payload.remaining());
size += payload.remaining();
synchronized (utf)
{
utf.append(payload);
}
if (needsNotification)
{
needsNotification = true;
this.onEvent.call(websocket,session,this);
}
}
@Override
public void close() throws IOException
{
// TODO Auto-generated method stub
finished = true;
}
@Override
public void messageComplete() throws IOException
public void messageComplete()
{
// TODO Auto-generated method stub
finished = true;
}
@Override

View File

@ -54,7 +54,7 @@ public class SimpleBinaryMessage implements MessageAppender
}
@Override
public void messageComplete() throws IOException
public void messageComplete()
{
BufferUtil.flipToFlush(this.buf,0);
finished = true;

View File

@ -51,7 +51,7 @@ public class SimpleTextMessage implements MessageAppender
}
@Override
public void messageComplete() throws IOException
public void messageComplete()
{
finished = true;

View File

@ -15,13 +15,16 @@
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.junit.rules.TestName;
public class LocalWebSocketConnection implements RawConnection
public class LocalWebSocketConnection implements WebSocketConnection
{
private final String id;
@ -51,8 +54,9 @@ public class LocalWebSocketConnection implements RawConnection
}
@Override
public void disconnect(boolean onlyOutput)
public WebSocketPolicy getPolicy()
{
return null;
}
@Override
@ -61,6 +65,12 @@ public class LocalWebSocketConnection implements RawConnection
return null;
}
@Override
public String getSubProtocol()
{
return null;
}
@Override
public boolean isOpen()
{
@ -68,8 +78,21 @@ public class LocalWebSocketConnection implements RawConnection
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
public boolean isReading()
{
return false;
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
}
@Override
public SuspendToken suspend()
{
return null;
}
@Override
@ -77,4 +100,22 @@ public class LocalWebSocketConnection implements RawConnection
{
return String.format("%s[%s]",LocalWebSocketConnection.class.getSimpleName(),id);
}
@Override
public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
{
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
{
}
@Override
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
}
}