JSR-356 adding basic MessageOutputStream support for writing to an OutputStream

This commit is contained in:
Joakim Erdfelt 2013-05-23 15:51:40 -07:00
parent 97854b6c5b
commit 48dea3bb1d
17 changed files with 586 additions and 81 deletions

View File

@ -37,7 +37,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
@ -55,7 +54,6 @@ public class JsrAnnotatedEventDriver extends AbstractEventDriver implements Even
private final EndpointConfig endpointconfig;
private boolean hasCloseBeenCalled = false;
private JsrSession jsrsession;
private MessageAppender activeMessage;
public JsrAnnotatedEventDriver(WebSocketPolicy policy, Object websocket, JsrEvents events, EndpointConfig endpointconfig)
{
@ -127,15 +125,7 @@ public class JsrAnnotatedEventDriver extends AbstractEventDriver implements Even
// Process any active MessageAppender
if (handled && (activeMessage != null))
{
LOG.debug("Appending Binary Message");
activeMessage.appendMessage(buffer,fin);
if (fin)
{
LOG.debug("Binary Message Complete");
activeMessage.messageComplete();
activeMessage = null;
}
appendMessage(buffer,fin);
}
}
@ -283,15 +273,7 @@ public class JsrAnnotatedEventDriver extends AbstractEventDriver implements Even
// Process any active MessageAppender
if (handled && (activeMessage != null))
{
LOG.debug("Appending Text Message");
activeMessage.appendMessage(buffer,fin);
if (fin)
{
LOG.debug("Text Message Complete");
activeMessage.messageComplete();
activeMessage = null;
}
appendMessage(buffer,fin);
}
}

View File

@ -40,7 +40,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.jsr356.ContainerService;
import org.eclipse.jetty.websocket.jsr356.Decoders;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
@ -65,7 +64,6 @@ public class JsrEndpointEventDriver extends AbstractEventDriver implements Event
private final Endpoint endpoint;
private JsrSession jsrsession;
private EndpointConfig endpointconfig;
private MessageAppender activeMessage;
private boolean hasCloseBeenCalled = false;
public JsrEndpointEventDriver(WebSocketPolicy policy, Endpoint endpoint, EndpointConfig config)

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common;
import java.net.InetSocketAddress;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -57,6 +58,12 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
void disconnect();
/**
* Get the ByteBufferPool in use by the connection
* @return
*/
ByteBufferPool getBufferPool();
/**
* Get the read/write idle timeout.
*

View File

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.MultiMap;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.UrlEncoded;
@ -186,6 +187,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return true;
}
public ByteBufferPool getBufferPool()
{
return this.connection.getBufferPool();
}
public LogicalConnection getConnection()
{
return connection;
@ -286,7 +292,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
if (connection.getIOState().isInputAvailable())
{
// Forward Errors to User WebSocket Object
websocket.incomingError(t);
websocket.incomingError(t);
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.events;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
@ -34,6 +35,7 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
/**
* EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
@ -44,6 +46,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
protected final WebSocketPolicy policy;
protected final Object websocket;
protected WebSocketSession session;
protected MessageAppender activeMessage;
public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
{
@ -51,6 +54,17 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
this.websocket = websocket;
}
protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
{
activeMessage.appendMessage(buffer,fin);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
}
@Override
public WebSocketPolicy getPolicy()
{
@ -92,7 +106,8 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
try
{
switch (frame.getType().getOpCode())
byte opcode = frame.getType().getOpCode();
switch (opcode)
{
case OpCode.CLOSE:
{
@ -134,6 +149,15 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
onTextFrame(frame.getPayload(),frame.isFin());
return;
}
case OpCode.CONTINUATION:
{
onContinuationFrame(frame.getPayload(),frame.isFin());
return;
}
default:
{
LOG.debug("Unhandled OpCode: {}",opcode);
}
}
}
catch (NotUtf8Exception e)
@ -150,6 +174,17 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
}
}
@Override
public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (activeMessage == null)
{
throw new IOException("Out of order Continuation frame encountered");
}
appendMessage(buffer,fin);
}
@Override
public void onPong(ByteBuffer buffer)
{

View File

@ -43,6 +43,8 @@ public interface EventDriver extends IncomingFrames
public void onConnect();
public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException;
public void onError(Throwable t);
public void onFrame(Frame frame);

View File

@ -27,7 +27,6 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
@ -39,7 +38,6 @@ import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
public class JettyAnnotatedEventDriver extends AbstractEventDriver
{
private final JettyAnnotatedMetadata events;
private MessageAppender activeMessage;
private boolean hasCloseBeenCalled = false;
public JettyAnnotatedEventDriver(WebSocketPolicy policy, Object websocket, JettyAnnotatedMetadata events)
@ -88,13 +86,7 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
}
}
activeMessage.appendMessage(buffer,fin);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
appendMessage(buffer,fin);
}
@Override
@ -187,13 +179,7 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
}
}
activeMessage.appendMessage(buffer,fin);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
appendMessage(buffer,fin);
}
@Override

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
@ -40,7 +39,6 @@ public class JettyListenerEventDriver extends AbstractEventDriver
{
private static final Logger LOG = Log.getLogger(JettyListenerEventDriver.class);
private final WebSocketListener listener;
private MessageAppender activeMessage;
private boolean hasCloseBeenCalled = false;
public JettyListenerEventDriver(WebSocketPolicy policy, WebSocketListener listener)
@ -57,13 +55,7 @@ public class JettyListenerEventDriver extends AbstractEventDriver
activeMessage = new SimpleBinaryMessage(this);
}
activeMessage.appendMessage(buffer,fin);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
appendMessage(buffer,fin);
}
@Override
@ -126,13 +118,7 @@ public class JettyListenerEventDriver extends AbstractEventDriver
activeMessage = new SimpleTextMessage(this);
}
activeMessage.appendMessage(buffer,fin);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
appendMessage(buffer,fin);
}
@Override
@ -144,6 +130,6 @@ public class JettyListenerEventDriver extends AbstractEventDriver
@Override
public String toString()
{
return String.format("%s[%s]", JettyListenerEventDriver.class.getSimpleName(), listener.getClass().getName());
return String.format("%s[%s]",JettyListenerEventDriver.class.getSimpleName(),listener.getClass().getName());
}
}

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -91,6 +92,13 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
// TODO: disconnect the virtual end-point?
}
@Override
public ByteBufferPool getBufferPool()
{
// TODO Auto-generated method stub
return null;
}
public long getChannelId()
{
return channelId;

View File

@ -59,7 +59,15 @@ public class FramePipes
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{
this.incoming.incomingFrame(frame);
try
{
this.incoming.incomingFrame(frame);
callback.writeSuccess();
}
catch (Throwable t)
{
callback.writeFailed(t);
}
}
}

View File

@ -20,38 +20,157 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
public class MessageOutputStream extends OutputStream
{
private final WebSocketSession session;
private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
private final OutgoingFrames outgoing;
private final int bufferSize;
private long frameCount = 0;
private WebSocketFrame frame;
private ByteBuffer buffer;
private FutureWriteCallback blocker;
private boolean closed = false;
public MessageOutputStream(WebSocketSession session)
{
this.session = session;
this.outgoing = session.getOutgoingHandler();
this.bufferSize = session.getPolicy().getMaxBinaryMessageBufferSize();
this.buffer = session.getBufferPool().acquire(bufferSize,true);
BufferUtil.flipToFill(buffer);
this.frame = new WebSocketFrame(OpCode.BINARY);
}
private void assertNotClosed() throws IOException
{
if (closed)
{
throw new IOException("Stream is closed");
}
}
@Override
public void close() throws IOException
public synchronized void close() throws IOException
{
// TODO finish sending whatever in the buffer with FIN=true
// TODO or just send an empty buffer with FIN=true
assertNotClosed();
LOG.debug("close()");
// finish sending whatever in the buffer with FIN=true
flush(true);
// close stream
LOG.debug("Sent Frame Count: {}",frameCount);
closed = true;
super.close();
LOG.debug("closed");
}
@Override
public void flush() throws IOException
public synchronized void flush() throws IOException
{
// TODO flush whatever is in the buffer with FIN=false
LOG.debug("flush()");
assertNotClosed();
// flush whatever is in the buffer with FIN=false
flush(false);
super.flush();
LOG.debug("flushed");
}
/**
* Flush whatever is in the buffer.
*
* @param fin
* fin flag
* @throws IOException
*/
private synchronized void flush(boolean fin) throws IOException
{
BufferUtil.flipToFlush(buffer,0);
LOG.debug("flush({}): {}",fin,BufferUtil.toDetailString(buffer));
frame.setPayload(buffer);
frame.setFin(fin);
blocker = new FutureWriteCallback();
outgoing.outgoingFrame(frame,blocker);
try
{
// block on write
blocker.get();
// block success
frameCount++;
frame.setOpCode(OpCode.CONTINUATION);
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause != null)
{
if (cause instanceof IOException)
{
throw (IOException)cause;
}
else
{
throw new IOException(cause);
}
}
throw new IOException("Failed to flush",e);
}
catch (InterruptedException e)
{
throw new IOException("Failed to flush",e);
}
}
@Override
public void write(int b) throws IOException
public synchronized void write(byte[] b) throws IOException
{
// TODO buffer up to limit, flush once buffer reached.
this.write(b,0,b.length);
}
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException
{
LOG.debug("write(byte[{}], {}, {})",b.length,off,len);
int left = len; // bytes left to write
int offset = off; // offset within provided array
while (left > 0)
{
LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer));
int space = buffer.remaining();
int size = Math.min(space,left);
buffer.put(b,offset,size);
left -= size; // decrement bytes left
if (left > 0)
{
flush(false);
}
offset += size; // increment offset
}
}
@Override
public synchronized void write(int b) throws IOException
{
assertNotClosed();
// buffer up to limit, flush once buffer reached.
buffer.put((byte)b);
if (buffer.remaining() <= 0)
{
flush(false);
}
}
}

View File

@ -21,17 +21,23 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.Writer;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.common.WebSocketSession;
public class MessageWriter extends Writer
{
private final WebSocketSession session;
private final RemoteEndpoint remote;
private final int bufferSize;
public MessageWriter(RemoteEndpoint remote, int bufferSize)
{
this.remote = remote;
this.bufferSize = bufferSize;
}
public MessageWriter(WebSocketSession session)
{
this.session = session;
this.bufferSize = session.getPolicy().getMaxTextMessageBufferSize();
this(session.getRemote(), session.getPolicy().getMaxTextMessageBufferSize());
}
@Override

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.common.io;
import java.net.InetSocketAddress;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -39,6 +41,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
{
private static final Logger LOG = Log.getLogger(LocalWebSocketConnection.class);
private final String id;
private final ByteBufferPool bufferPool;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
private IncomingFrames incoming;
private IOState ioState = new IOState();
@ -51,13 +54,13 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
public LocalWebSocketConnection(String id)
{
this.id = id;
this.bufferPool = new MappedByteBufferPool();
this.ioState.addListener(this);
}
public LocalWebSocketConnection(TestName testname)
{
this.id = testname.getMethodName();
this.ioState.addListener(this);
this(testname.getMethodName());
}
@Override
@ -80,6 +83,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
LOG.debug("disconnect()");
}
@Override
public ByteBufferPool getBufferPool()
{
return this.bufferPool;
}
@Override
public long getIdleTimeout()
{

View File

@ -29,13 +29,11 @@ public class LocalWebSocketSession extends WebSocketSession
{
private String id;
private OutgoingFramesCapture outgoingCapture;
private LocalWebSocketConnection lwsconnection;
public LocalWebSocketSession(TestName testname, EventDriver driver)
{
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname));
this.id = testname.getMethodName();
this.lwsconnection = (LocalWebSocketConnection)getConnection();
outgoingCapture = new OutgoingFramesCapture();
setOutgoingHandler(outgoingCapture);
}
@ -45,13 +43,6 @@ public class LocalWebSocketSession extends WebSocketSession
return outgoingCapture;
}
@Override
public void open()
{
lwsconnection.onOpen();
super.open();
}
@Override
public String toString()
{

View File

@ -0,0 +1,60 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import java.nio.ByteBuffer;
public class MessageDebug
{
public static String toDetailHint(byte[] data, int offset, int len)
{
StringBuilder buf = new StringBuilder();
ByteBuffer buffer = ByteBuffer.wrap(data,offset,len);
buf.append("byte[").append(data.length);
buf.append("](o=").append(offset);
buf.append(",len=").append(len);
buf.append(")<<<");
for (int i = buffer.position(); i < buffer.limit(); i++)
{
char c = (char)buffer.get(i);
if ((c >= ' ') && (c <= 127))
{
buf.append(c);
}
else if ((c == '\r') || (c == '\n'))
{
buf.append('|');
}
else
{
buf.append('\ufffd');
}
if ((i == (buffer.position() + 16)) && (buffer.limit() > (buffer.position() + 32)))
{
buf.append("...");
i = buffer.limit() - 16;
}
}
buf.append(">>>");
return buf.toString();
}
}

View File

@ -0,0 +1,133 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.util.Arrays;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.io.FramePipes;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class MessageOutputStreamTest
{
private static final Logger LOG = Log.getLogger(MessageOutputStreamTest.class);
@Rule
public TestTracker testtracker = new TestTracker();
@Rule
public TestName testname = new TestName();
private WebSocketPolicy policy;
private TrackingSocket socket;
private LocalWebSocketSession session;
@After
public void closeSession()
{
session.close();
}
@Before
public void setupSession()
{
policy = WebSocketPolicy.newServerPolicy();
policy.setInputBufferSize(1024);
policy.setMaxBinaryMessageBufferSize(1024);
// Event Driver factory
EventDriverFactory factory = new EventDriverFactory(policy);
// local socket
EventDriver driver = factory.wrap(new TrackingSocket("local"));
// remote socket
socket = new TrackingSocket("remote");
OutgoingFrames socketPipe = FramePipes.to(factory.wrap(socket));
session = new LocalWebSocketSession(testname,driver);
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// open connection
session.open();
}
@Test
public void testMultipleWrites() throws Exception
{
try (MessageOutputStream stream = new MessageOutputStream(session))
{
stream.write("Hello".getBytes("UTF-8"));
stream.write(" ".getBytes("UTF-8"));
stream.write("World".getBytes("UTF-8"));
}
Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1));
String msg = socket.messageQueue.poll();
Assert.assertThat("Message",msg,allOf(containsString("byte[11]"),containsString("Hello World")));
}
@Test
public void testSingleWrite() throws Exception
{
try (MessageOutputStream stream = new MessageOutputStream(session))
{
stream.write("Hello World".getBytes("UTF-8"));
}
Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1));
String msg = socket.messageQueue.poll();
Assert.assertThat("Message",msg,allOf(containsString("byte[11]"),containsString("Hello World")));
}
@Test
public void testWriteMultipleBuffers() throws Exception
{
int bufsize = (int)(policy.getMaxBinaryMessageBufferSize() * 2.5);
byte buf[] = new byte[bufsize];
LOG.debug("Buffer size: {}",bufsize);
Arrays.fill(buf,(byte)'x');
buf[bufsize - 1] = (byte)'o'; // mark last entry for debugging
try (MessageOutputStream stream = new MessageOutputStream(session))
{
stream.write(buf);
}
Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1));
String msg = socket.messageQueue.poll();
Assert.assertThat("Message",msg,allOf(containsString("byte[" + bufsize + "]"),containsString("xxxo>>>")));
}
}

View File

@ -0,0 +1,169 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.junit.Assert;
/**
* Testing Socket used on client side WebSocket testing.
*/
public class TrackingSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(TrackingSocket.class);
private final String id;
public int closeCode = -1;
public StringBuilder closeMessage = new StringBuilder();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public TrackingSocket()
{
this("socket");
}
public TrackingSocket(String id)
{
this.id = id;
}
public void assertClose(int expectedStatusCode, String expectedReason) throws InterruptedException
{
assertCloseCode(expectedStatusCode);
assertCloseReason(expectedReason);
}
public void assertCloseCode(int expectedCode) throws InterruptedException
{
Assert.assertThat("Was Closed",closeLatch.await(50,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Close Code",closeCode,is(expectedCode));
}
private void assertCloseReason(String expectedReason)
{
Assert.assertThat("Close Reason",closeMessage.toString(),is(expectedReason));
}
public void assertIsOpen() throws InterruptedException
{
assertWasOpened();
assertNotClosed();
}
public void assertMessage(String expected)
{
String actual = messageQueue.poll();
Assert.assertEquals("Message",expected,actual);
}
public void assertNotClosed()
{
Assert.assertThat("Closed Latch",closeLatch.getCount(),greaterThanOrEqualTo(1L));
}
public void assertNotOpened()
{
Assert.assertThat("Open Latch",openLatch.getCount(),greaterThanOrEqualTo(1L));
}
public void assertWasOpened() throws InterruptedException
{
Assert.assertThat("Was Opened",openLatch.await(500,TimeUnit.MILLISECONDS),is(true));
}
public void awaitMessage(int expectedMessageCount, TimeUnit timeoutUnit, int timeoutDuration) throws TimeoutException, InterruptedException
{
messageQueue.awaitEventCount(expectedMessageCount,timeoutDuration,timeoutUnit);
}
public void clear()
{
messageQueue.clear();
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
LOG.debug("{} onWebSocketBinary(byte[{}],{},{})",id,payload.length,offset,len);
messageQueue.offer(MessageDebug.toDetailHint(payload,offset,len));
dataLatch.countDown();
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
LOG.debug("{} onWebSocketClose({},{})",id,statusCode,reason);
super.onWebSocketClose(statusCode,reason);
closeCode = statusCode;
closeMessage.append(reason);
closeLatch.countDown();
}
@Override
public void onWebSocketConnect(Session session)
{
super.onWebSocketConnect(session);
openLatch.countDown();
}
@Override
public void onWebSocketError(Throwable cause)
{
LOG.debug("{} onWebSocketError",id,cause);
Assert.assertThat("Error capture",errorQueue.offer(cause),is(true));
}
@Override
public void onWebSocketText(String message)
{
LOG.debug("{} onWebSocketText({})",id,message);
messageQueue.offer(message);
dataLatch.countDown();
}
public void waitForClose(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Closed",closeLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void waitForConnected(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Connected",openLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void waitForMessage(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
LOG.debug("{} Waiting for message",id);
Assert.assertThat("Message Received",dataLatch.await(timeoutDuration,timeoutUnit),is(true));
}
}