diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index 4f4d88d2de9..1c892518f6c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -23,6 +23,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; @@ -36,9 +38,18 @@ import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; */ public class WebSocketRemoteEndpoint implements RemoteEndpoint { + private static final String PRIORMSG_ERROR = "Prior message pending, cannot start new message yet."; + /** Type of Message */ + private static final int NONE = 0; + private static final int TEXT = 1; + private static final int BINARY = 2; + private static final int CONTROL = 3; + private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class); public final LogicalConnection connection; public final OutgoingFrames outgoing; + private final ReentrantLock msgLock = new ReentrantLock(); + private final AtomicInteger msgType = new AtomicInteger(NONE); public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing) { @@ -100,89 +111,234 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public void sendBytes(ByteBuffer data) throws IOException { - connection.getIOState().assertOutputOpen(); - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data)); + try + { + msgType.set(BINARY); + connection.getIOState().assertOutputOpen(); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data)); + } + WebSocketFrame frame = WebSocketFrame.binary().setPayload(data); + blockingWrite(frame); + } + finally + { + msgType.set(NONE); + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - WebSocketFrame frame = WebSocketFrame.binary().setPayload(data); - blockingWrite(frame); } @Override public Future sendBytesByFuture(ByteBuffer data) { - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data)); + try + { + msgType.set(BINARY); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data)); + } + WebSocketFrame frame = WebSocketFrame.binary().setPayload(data); + return sendAsyncFrame(frame); + } + finally + { + msgType.set(NONE); + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - WebSocketFrame frame = WebSocketFrame.binary().setPayload(data); - return sendAsyncFrame(frame); } @Override public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException { - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast); + try + { + if (msgType.get() == TEXT) + { + throw new IllegalStateException("Prior TEXT message pending, cannot start new BINARY message yet."); + } + msgType.set(BINARY); + + if (LOG.isDebugEnabled()) + { + LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast); + } + WebSocketFrame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast); + blockingWrite(frame); + } + finally + { + if (isLast) + { + msgType.set(NONE); + } + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - WebSocketFrame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast); - blockingWrite(frame); } @Override public void sendPartialString(String fragment, boolean isLast) throws IOException { - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendPartialString({}, {})",fragment,isLast); + try + { + if (msgType.get() == BINARY) + { + throw new IllegalStateException("Prior BINARY message pending, cannot start new TEXT message yet."); + } + msgType.set(TEXT); + + if (LOG.isDebugEnabled()) + { + LOG.debug("sendPartialString({}, {})",fragment,isLast); + } + WebSocketFrame frame = WebSocketFrame.text(fragment).setFin(isLast); + blockingWrite(frame); + + } + finally + { + if (isLast) + { + msgType.set(NONE); + } + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - WebSocketFrame frame = WebSocketFrame.text(fragment).setFin(isLast); - blockingWrite(frame); } @Override public void sendPing(ByteBuffer applicationData) throws IOException { - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData)); + try + { + msgType.set(CONTROL); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData)); + } + WebSocketFrame frame = WebSocketFrame.ping().setPayload(applicationData); + blockingWrite(frame); + } + finally + { + msgType.set(NONE); + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - WebSocketFrame frame = WebSocketFrame.ping().setPayload(applicationData); - blockingWrite(frame); } @Override public void sendPong(ByteBuffer applicationData) throws IOException { - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData)); + try + { + msgType.set(CONTROL); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData)); + } + WebSocketFrame frame = WebSocketFrame.pong().setPayload(applicationData); + blockingWrite(frame); + } + finally + { + msgType.set(NONE); + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - WebSocketFrame frame = WebSocketFrame.pong().setPayload(applicationData); - blockingWrite(frame); } @Override public void sendString(String text) throws IOException { - WebSocketFrame frame = WebSocketFrame.text(text); - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload())); + try + { + msgType.set(TEXT); + WebSocketFrame frame = WebSocketFrame.text(text); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload())); + } + blockingWrite(WebSocketFrame.text(text)); + } + finally + { + msgType.set(NONE); + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - blockingWrite(WebSocketFrame.text(text)); } @Override public Future sendStringByFuture(String text) { - WebSocketFrame frame = WebSocketFrame.text(text); - if (LOG.isDebugEnabled()) + if (msgLock.tryLock()) { - LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload())); + try + { + msgType.set(BINARY); + WebSocketFrame frame = WebSocketFrame.text(text); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload())); + } + return sendAsyncFrame(frame); + } + finally + { + msgType.set(NONE); + msgLock.unlock(); + } + } + else + { + throw new IllegalStateException(PRIORMSG_ERROR); } - return sendAsyncFrame(frame); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 732ddfb25f8..c2088e9550b 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -332,6 +332,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp write(buffer); } + @Override public ByteBufferPool getBufferPool() { return bufferPool; @@ -535,7 +536,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp EndPoint endPoint = getEndPoint(); try { - while (true) + while (true) // TODO: should this honor the LogicalConnection.suspend() ? { int filled = endPoint.fill(buffer); if (filled == 0) @@ -555,6 +556,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer)); } parser.parse(buffer); + // TODO: has the end user application already consumed what it was given? } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index b09a6ad474f..59014789b54 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -35,6 +35,8 @@ import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; /** * Support for writing a single WebSocket TEXT message via a {@link Writer} + *

+ * Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8 */ public class MessageWriter extends Writer { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/Utf8CharBuffer.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/Utf8CharBuffer.java index bf787af1f2f..9d634fbe50a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/Utf8CharBuffer.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/Utf8CharBuffer.java @@ -71,6 +71,10 @@ public class Utf8CharBuffer extends Utf8Appendable public ByteBuffer getByteBuffer() { + // remember settings + int limit = buffer.limit(); + int position = buffer.position(); + // flip to flush buffer.limit(buffer.position()); buffer.position(0); @@ -78,6 +82,10 @@ public class Utf8CharBuffer extends Utf8Appendable // get byte buffer ByteBuffer bb = UTF8.encode(buffer); + // restor settings + buffer.limit(limit); + buffer.position(position); + return bb; } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java new file mode 100644 index 00000000000..287b78d420e --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java @@ -0,0 +1,86 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +import static org.hamcrest.Matchers.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class WebSocketRemoteEndpointTest +{ + @Rule + public TestName testname = new TestName(); + + @Test + public void testTextBinaryText() throws IOException + { + LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); + OutgoingFramesCapture outgoing = new OutgoingFramesCapture(); + WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn,outgoing); + conn.connect(); + conn.open(); + + // Start text message + remote.sendPartialString("Hello ",false); + + try + { + // Attempt to start Binary Message + ByteBuffer bytes = ByteBuffer.wrap(new byte[] + { 0, 1, 2 }); + remote.sendPartialBytes(bytes,false); + Assert.fail("Expected " + IllegalStateException.class.getName()); + } + catch (IllegalStateException e) + { + // Expected path + Assert.assertThat("Exception",e.getMessage(),containsString("message pending")); + } + + // End text message + remote.sendPartialString("World!",true); + } + + @Test + public void testTextPingText() throws IOException + { + LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); + OutgoingFramesCapture outgoing = new OutgoingFramesCapture(); + WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn,outgoing); + conn.connect(); + conn.open(); + + // Start text message + remote.sendPartialString("Hello ",false); + + // Attempt to send Ping Message + remote.sendPing(ByteBuffer.wrap(new byte[] + { 0 })); + + // End text message + remote.sendPartialString("World!",true); + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddClientTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddClientTest.java index 2b87ff23ee0..18480e3e3d0 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddClientTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddClientTest.java @@ -44,7 +44,7 @@ public class MuxerAddClientTest // Client side physical socket LocalWebSocketConnection physical = new LocalWebSocketConnection(testname); physical.setPolicy(WebSocketPolicy.newClientPolicy()); - physical.onOpen(); + physical.open(); // Server Reader MuxDecoder serverRead = new MuxDecoder(); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddServerTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddServerTest.java index 0dfffa1e2bc..007f58b2499 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddServerTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/mux/add/MuxerAddServerTest.java @@ -48,7 +48,7 @@ public class MuxerAddServerTest // Server side physical connection LocalWebSocketConnection physical = new LocalWebSocketConnection(testname); physical.setPolicy(WebSocketPolicy.newServerPolicy()); - physical.onOpen(); + physical.open(); // Client reader MuxDecoder clientRead = new MuxDecoder(); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java index 7770ed37141..3659dbe2817 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java @@ -77,6 +77,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram ioState.onCloseLocal(close); } + public void connect() + { + LOG.debug("connect()"); + ioState.onConnected(); + } + @Override public void disconnect() { @@ -92,7 +98,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram @Override public long getIdleTimeout() { - // TODO Auto-generated method stub return 0; } @@ -116,7 +121,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram @Override public long getMaxIdleTimeout() { - // TODO Auto-generated method stub return 0; } @@ -184,8 +188,9 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram } } - public void onOpen() { - LOG.debug("onOpen()"); + public void open() + { + LOG.debug("open()"); ioState.onOpened(); } @@ -202,8 +207,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram @Override public void setMaxIdleTimeout(long ms) { - // TODO Auto-generated method stub - } @Override diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/DummySocket.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/DummySocket.java new file mode 100644 index 00000000000..20e7208569c --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/DummySocket.java @@ -0,0 +1,29 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.message; + +import org.eclipse.jetty.websocket.api.WebSocketAdapter; + +/** + * Do nothing Dummy Socket, used in testing. + */ +public class DummySocket extends WebSocketAdapter +{ + /* intentionally empty */ +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java new file mode 100644 index 00000000000..a8ab20aeda2 --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -0,0 +1,105 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.message; + +import static org.hamcrest.Matchers.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; +import org.eclipse.jetty.websocket.common.events.EventDriver; +import org.eclipse.jetty.websocket.common.events.EventDriverFactory; +import org.eclipse.jetty.websocket.common.io.FramePipes; +import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class MessageInputStreamTest +{ + @Rule + public TestTracker testtracker = new TestTracker(); + + @Rule + public TestName testname = new TestName(); + + private WebSocketPolicy policy; + private TrackingInputStreamSocket socket; + private LocalWebSocketSession session; + private LocalWebSocketSession remoteSession; + + @After + public void closeSession() + { + session.close(); + remoteSession.close(); + } + + @Before + public void setupSession() + { + policy = WebSocketPolicy.newServerPolicy(); + policy.setInputBufferSize(1024); + policy.setMaxBinaryMessageBufferSize(1024); + policy.setMaxTextMessageBufferSize(1024); + + // Event Driver factory + EventDriverFactory factory = new EventDriverFactory(policy); + + // Local Socket + EventDriver localDriver = factory.wrap(new DummySocket()); + + // Remote socket & Session + socket = new TrackingInputStreamSocket("remote"); + EventDriver remoteDriver = factory.wrap(socket); + remoteSession = new LocalWebSocketSession(testname,remoteDriver); + remoteSession.open(); + OutgoingFrames socketPipe = FramePipes.to(remoteDriver); + + // Local Session + session = new LocalWebSocketSession(testname,localDriver); + + session.setPolicy(policy); + // talk to our remote socket + session.setOutgoingHandler(socketPipe); + // open connection + session.open(); + } + + @Test + @Ignore + public void testSimpleMessage() throws IOException + { + ByteBuffer data = BufferUtil.toBuffer("Hello World",StringUtil.__UTF8_CHARSET); + session.getRemote().sendBytes(data); + + Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1)); + String msg = socket.messageQueue.poll(); + Assert.assertThat("Message",msg,is("Hello World")); + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/TrackingInputStreamSocket.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/TrackingInputStreamSocket.java new file mode 100644 index 00000000000..d49fd94b4aa --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/TrackingInputStreamSocket.java @@ -0,0 +1,110 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.message; + +import static org.hamcrest.Matchers.*; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.toolchain.test.EventQueue; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.junit.Assert; + +@WebSocket +public class TrackingInputStreamSocket +{ + private static final Logger LOG = Log.getLogger(TrackingInputStreamSocket.class); + private final String id; + public int closeCode = -1; + public StringBuilder closeMessage = new StringBuilder(); + public CountDownLatch closeLatch = new CountDownLatch(1); + public EventQueue messageQueue = new EventQueue<>(); + public EventQueue errorQueue = new EventQueue<>(); + + public TrackingInputStreamSocket() + { + this("socket"); + } + + public TrackingInputStreamSocket(String id) + { + this.id = id; + } + + public void assertClose(int expectedStatusCode, String expectedReason) throws InterruptedException + { + assertCloseCode(expectedStatusCode); + assertCloseReason(expectedReason); + } + + public void assertCloseCode(int expectedCode) throws InterruptedException + { + Assert.assertThat("Was Closed",closeLatch.await(50,TimeUnit.MILLISECONDS),is(true)); + Assert.assertThat("Close Code",closeCode,is(expectedCode)); + } + + private void assertCloseReason(String expectedReason) + { + Assert.assertThat("Close Reason",closeMessage.toString(),is(expectedReason)); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) + { + LOG.debug("{} onClose({},{})",id,statusCode,reason); + closeCode = statusCode; + closeMessage.append(reason); + closeLatch.countDown(); + } + + @OnWebSocketError + public void onError(Throwable cause) + { + errorQueue.add(cause); + } + + @OnWebSocketMessage + public void onInputStream(InputStream stream) + { + LOG.debug("{} onInputStream({})",id,stream); + try + { + String msg = IO.toString(stream); + messageQueue.add(msg); + } + catch (IOException e) + { + errorQueue.add(e); + } + } + + public void waitForClose(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException + { + Assert.assertThat("Client Socket Closed",closeLatch.await(timeoutDuration,timeoutUnit),is(true)); + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/Utf8CharBufferTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/Utf8CharBufferTest.java index 2afe246a1db..1ec0eb0b8d1 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/Utf8CharBufferTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/Utf8CharBufferTest.java @@ -42,7 +42,7 @@ public class Utf8CharBufferTest @Test public void testAppendGetAppendGet() { - ByteBuffer buf = ByteBuffer.allocate(64); + ByteBuffer buf = ByteBuffer.allocate(128); Utf8CharBuffer utf = Utf8CharBuffer.wrap(buf); byte hellobytes[] = asUTF("Hello "); @@ -60,8 +60,8 @@ public class Utf8CharBufferTest @Test public void testAppendGetClearAppendGet() { - int bufsize = 64; - ByteBuffer buf = ByteBuffer.allocate(64); + int bufsize = 128; + ByteBuffer buf = ByteBuffer.allocate(bufsize); Utf8CharBuffer utf = Utf8CharBuffer.wrap(buf); int expectedSize = bufsize / 2;