Fleshing out the write methods on AsyncWebSocketConnection

This commit is contained in:
Joakim Erdfelt 2012-06-28 15:07:39 -07:00
parent 256dec3e91
commit d2561acde4
6 changed files with 149 additions and 36 deletions

View File

@ -1,7 +1,7 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
@ -16,8 +16,13 @@ public interface WebSocketConnection
* Terminate connection, {@link StatusCode#NORMAL}, without a reason.
* <p>
* Basic usage: results in an non-blocking async write, then connection close.
*
* @throws IOException
* if unable to send the close frame, or close the connection successfully.
* @see StatusCode
* @see #close(int, String)
*/
void close();
void close() throws IOException;
/**
* Terminate connection, with status code.
@ -28,9 +33,11 @@ public interface WebSocketConnection
* the status code
* @param reason
* the (optional) reason. (can be null for no reason)
* @throws IOException
* if unable to send the close frame, or close the connection successfully.
* @see StatusCode
*/
void close(int statusCode, String reason);
void close(int statusCode, String reason) throws IOException;
/**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection.
@ -44,7 +51,7 @@ public interface WebSocketConnection
*
* @return the remote address if available. (situations like mux extension and proxying makes this information unreliable)
*/
InetAddress getRemoteAddress();
InetSocketAddress getRemoteAddress();
/**
* Get the SubProtocol in use for this connection.

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.websocket.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.OpCode;
@ -16,12 +18,33 @@ public class BinaryFrame extends DataFrame
super(OpCode.BINARY);
}
/**
* Construct with byte array payload data
*/
public BinaryFrame( byte[] payload )
{
this();
setPayload(payload);
}
/**
* Construct with partial byte array payload data support
*/
public BinaryFrame(byte[] data, int offset, int length)
{
this();
setPayload(ByteBuffer.wrap(data,offset,length));
}
/**
* Construct with ByteBuffer payload data
*/
public BinaryFrame(ByteBuffer payload)
{
this();
setPayload(payload);
}
@Override
public String toString()
{

View File

@ -7,8 +7,36 @@ import org.eclipse.jetty.websocket.api.PolicyViolationException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.BaseFrame;
/**
* <pre>
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-------+-+-------------+-------------------------------+
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
* |I|S|S|S| (4) |A| (7) | (16/64) |
* |N|V|V|V| |S| | (if payload len==126/127) |
* | |1|2|3| |K| | |
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
* | Extended payload length continued, if payload len == 127 |
* + - - - - - - - - - - - - - - - +-------------------------------+
* | |Masking-key, if MASK set to 1 |
* +-------------------------------+-------------------------------+
* | Masking-key (continued) | Payload Data |
* +-------------------------------- - - - - - - - - - - - - - - - +
* : Payload Data continued ... :
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
* | Payload Data continued ... |
* +---------------------------------------------------------------+
* </pre>
*
* @param <T>
*/
public abstract class FrameGenerator<T extends BaseFrame>
{
/**
* The overhead (maximum) for a framing header. Assuming a maximum sized payload with masking key.
*/
public static final int OVERHEAD = 28;
private final WebSocketPolicy policy;
protected FrameGenerator(WebSocketPolicy policy)

View File

@ -32,7 +32,6 @@ import org.eclipse.jetty.websocket.frames.BaseFrame;
* </pre>
*/
public class Generator {
private final EnumMap<OpCode, FrameGenerator<?>> generators = new EnumMap<>(OpCode.class);
public Generator(WebSocketPolicy policy)

View File

@ -1,7 +1,7 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
@ -44,7 +44,7 @@ public class LocalWebSocketConnection implements WebSocketConnection
}
@Override
public InetAddress getRemoteAddress()
public InetSocketAddress getRemoteAddress()
{
return null;
}

View File

@ -1,7 +1,7 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -14,6 +14,7 @@ import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.ExtensionConfig;
@ -21,7 +22,10 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.BinaryFrame;
import org.eclipse.jetty.websocket.frames.CloseFrame;
import org.eclipse.jetty.websocket.frames.TextFrame;
import org.eclipse.jetty.websocket.generator.FrameGenerator;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.server.callbacks.WebSocketCloseCallback;
@ -62,17 +66,15 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
}
@Override
public void close()
public void close() throws IOException
{
// TODO Auto-generated method stub
write(new CloseFrame(StatusCode.NORMAL));
}
@Override
public void close(int statusCode, String reason)
public void close(int statusCode, String reason) throws IOException
{
// TODO Auto-generated method stub
write(new CloseFrame(statusCode,reason));
}
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
@ -108,15 +110,13 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
@Override
public WebSocketPolicy getPolicy()
{
// TODO Auto-generated method stub
return null;
return this.policy;
}
@Override
public InetAddress getRemoteAddress()
public InetSocketAddress getRemoteAddress()
{
// TODO Auto-generated method stub
return null;
return getEndPoint().getRemoteAddress();
}
@Override
@ -129,8 +129,7 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
@Override
public boolean isOpen()
{
// TODO Auto-generated method stub
return false;
return getEndPoint().isOpen();
}
@Override
@ -194,8 +193,7 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
*/
private void terminateConnection(short statusCode, String reason)
{
CloseFrame close = new CloseFrame(statusCode);
close.setReason(reason);
CloseFrame close = new CloseFrame(statusCode,reason);
// fire and forget -> close frame
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
@ -206,49 +204,107 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
@Override
public void write(BaseFrame frame) throws IOException
{
// TODO Auto-generated method stub
if (frame == null)
{
// nothing to write
return;
}
ByteBuffer raw = ByteBuffer.allocate(frame.getPayloadLength() + FrameGenerator.OVERHEAD);
generator.generate(raw,frame);
Callback<Void> nop = new FutureCallback<>(); // TODO: add buffer release callback?
getEndPoint().write(null,nop,raw);
}
@Override
public void write(byte[] data, int offset, int length) throws IOException
{
// TODO Auto-generated method stub
write(new BinaryFrame(data,offset,length));
}
@Override
public void write(ByteBuffer... buffers) throws IOException
{
// TODO Auto-generated method stub
int len = buffers.length;
if (len == 0)
{
// nothing to write
return;
}
ByteBuffer raw[] = new ByteBuffer[len];
for (int i = 0; i < len; i++)
{
raw[i] = bufferPool.acquire(buffers[i].remaining() + FrameGenerator.OVERHEAD,false);
BinaryFrame frame = new BinaryFrame(buffers[i]);
generator.generate(raw[i],frame);
}
Callback<Void> nop = new FutureCallback<>(); // TODO: add buffer release callback?
getEndPoint().write(null,nop,raw);
}
@Override
public <C> void write(C context, Callback<C> callback, BaseFrame... frames) throws IOException
{
// TODO Auto-generated method stub
int len = frames.length;
if (len == 0)
{
// nothing to write
return;
}
ByteBuffer raw[] = new ByteBuffer[len];
for (int i = 0; i < len; i++)
{
raw[i] = bufferPool.acquire(frames[i].getPayloadLength() + FrameGenerator.OVERHEAD,false);
generator.generate(raw[i],frames[i]);
}
getEndPoint().write(context,callback,raw);
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IOException
{
// TODO Auto-generated method stub
int len = buffers.length;
if (len == 0)
{
// nothing to write
return;
}
ByteBuffer raw[] = new ByteBuffer[len];
for (int i = 0; i < len; i++)
{
raw[i] = bufferPool.acquire(buffers[i].remaining() + FrameGenerator.OVERHEAD,false);
BinaryFrame frame = new BinaryFrame(buffers[i]);
generator.generate(raw[i],frame);
}
getEndPoint().write(context,callback,raw);
}
@Override
public <C> void write(C context, Callback<C> callback, String... messages) throws IOException
{
// TODO Auto-generated method stub
int len = messages.length;
if (len == 0)
{
// nothing to write
return;
}
TextFrame frames[] = new TextFrame[len];
for (int i = 0; i < len; i++)
{
frames[i] = new TextFrame(messages[i]);
}
write(context,callback,frames);
}
@Override
public void write(String message) throws IOException
{
// TODO Auto-generated method stub
if (message == null)
{
// nothing to write
return;
}
write(new TextFrame(message));
}
}