diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java index d7c7731f071..b0985aec69c 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java @@ -2,6 +2,7 @@ package org.eclipse.jetty.websocket.api; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import org.eclipse.jetty.util.Callback; @@ -81,6 +82,13 @@ public interface WebSocketConnection */ void write(C context, Callback callback, byte buf[], int offset, int len) throws IOException; + /** + * Send a a binary message. + *

+ * NIO style with callbacks, allows for concurrent results of the write operation. + */ + void write(C context, Callback callback, ByteBuffer buffer) throws IOException; + /** * Send a series of text messages. *

diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index a03271351bd..26f49d3a307 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -341,6 +341,24 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements flush(); } + /** + * {@inheritDoc} + */ + @Override + public void write(C context, Callback callback, ByteBuffer buffer) throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer)); + } + + WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer); + DataFrameBytes bytes = new DataFrameBytes(this,callback,context,frame); + scheduleTimeout(bytes); + queue.append(bytes); + flush(); + } + /** * {@inheritDoc} */ diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java index e72201f00ac..fdc7d0c84b2 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java @@ -2,6 +2,7 @@ package org.eclipse.jetty.websocket.io; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.api.WebSocketConnection; @@ -64,8 +65,6 @@ public class LocalWebSocketConnection implements WebSocketConnection @Override public void ping(C context, Callback callback, byte[] payload) throws IOException { - // TODO Auto-generated method stub - } @Override @@ -79,10 +78,13 @@ public class LocalWebSocketConnection implements WebSocketConnection { } + @Override + public void write(C context, Callback callback, ByteBuffer buffer) throws IOException + { + } + @Override public void write(C context, Callback callback, String message) throws IOException { - // TODO Auto-generated method stub - } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ByteBufferAssert.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ByteBufferAssert.java new file mode 100644 index 00000000000..8e698cd0e3f --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ByteBufferAssert.java @@ -0,0 +1,43 @@ +package org.eclipse.jetty.websocket.server; + +import static org.hamcrest.Matchers.*; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; +import org.junit.Assert; + +public class ByteBufferAssert +{ + public static void assertEquals(String message, byte[] expected, byte[] actual) + { + Assert.assertThat(message + " byte[].length",actual.length,is(expected.length)); + int len = expected.length; + for (int i = 0; i < len; i++) + { + Assert.assertThat(message + " byte[" + i + "]",actual[i],is(expected[i])); + } + } + + public static void assertEquals(String message, ByteBuffer expectedBuffer, ByteBuffer actualBuffer) + { + byte expectedBytes[] = BufferUtil.toArray(expectedBuffer); + byte actualBytes[] = BufferUtil.toArray(actualBuffer); + assertEquals(message,expectedBytes,actualBytes); + } + + public static void assertEquals(String message, String expectedString, ByteBuffer actualBuffer) + { + String actualString = BufferUtil.toString(actualBuffer); + Assert.assertThat(message,expectedString,is(actualString)); + } + + public static void assertSize(String message, int expectedSize, ByteBuffer buffer) + { + if ((expectedSize == 0) && (buffer == null)) + { + return; + } + Assert.assertThat(message + " buffer.remaining",buffer.remaining(),is(expectedSize)); + } +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/DeflateExtensionTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/DeflateExtensionTest.java index 873333b3dce..7b4f21236cc 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/DeflateExtensionTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/DeflateExtensionTest.java @@ -5,7 +5,6 @@ import static org.hamcrest.Matchers.*; import java.util.Queue; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.server.WebSocketServletRFCTest.RFCServlet; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; @@ -56,7 +55,7 @@ public class DeflateExtensionTest text = text + text + text + text; text = text + text + text + text + 'X'; - client.write(FrameBuilder.text(text).asFrame()); + client.write(WebSocketFrame.text(text)); // TODO: use socket that captures frame payloads to verify fragmentation diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FragmentExtensionTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FragmentExtensionTest.java index 327bab765f5..4b743c409e9 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FragmentExtensionTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FragmentExtensionTest.java @@ -5,7 +5,6 @@ import static org.hamcrest.Matchers.*; import java.util.Queue; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.server.WebSocketServletRFCTest.RFCServlet; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; @@ -51,7 +50,7 @@ public class FragmentExtensionTest Assert.assertThat("Response",resp,containsString("fragment")); String msg = "Sent as a long message that should be split"; - client.write(FrameBuilder.text(msg).asFrame()); + client.write(WebSocketFrame.text(msg)); // TODO: use socket that captures frame counts to verify fragmentation diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdentityExtensionTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdentityExtensionTest.java index 1936bedbdea..eb4ae8f7356 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdentityExtensionTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdentityExtensionTest.java @@ -5,7 +5,6 @@ import static org.hamcrest.Matchers.*; import java.util.Queue; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.server.WebSocketServletRFCTest.RFCServlet; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; @@ -50,7 +49,7 @@ public class IdentityExtensionTest Assert.assertThat("Response",resp,containsString("identity")); - client.write(FrameBuilder.text("Hello").asFrame()); + client.write(WebSocketFrame.text("Hello")); Queue frames = client.readFrames(1,TimeUnit.MILLISECONDS,1000); WebSocketFrame frame = frames.remove(); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/UnitGenerator.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/UnitGenerator.java new file mode 100644 index 00000000000..aaba422f775 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/UnitGenerator.java @@ -0,0 +1,16 @@ +package org.eclipse.jetty.websocket.server; + +import org.eclipse.jetty.io.StandardByteBufferPool; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.generator.Generator; + +/** + * Convenience Generator. + */ +public class UnitGenerator extends Generator +{ + public UnitGenerator() + { + super(WebSocketPolicy.newServerPolicy(),new StandardByteBufferPool()); + } +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java index d54aa7420ac..2ae1822bd3e 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java @@ -42,7 +42,6 @@ import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.generator.Generator; import org.eclipse.jetty.websocket.parser.Parser; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.server.examples.MyEchoSocket; import org.junit.AfterClass; @@ -97,7 +96,7 @@ public class WebSocketLoadRFC6455Test WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT); // _endp=new SocketEndPoint(socket); - _generator = new Generator(policy); + _generator = new UnitGenerator(); _parser = new Parser(policy); } @@ -135,9 +134,8 @@ public class WebSocketLoadRFC6455Test String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF"; for (int i = 0; i < iterations; ++i) { - WebSocketFrame txt = FrameBuilder.text(message).asFrame(); - ByteBuffer buf = ByteBuffer.allocate((message.length() * iterations) + 32); - _generator.generate(buf,txt); + WebSocketFrame txt = WebSocketFrame.text(message); + ByteBuffer buf = _generator.generate(txt); // TODO: Send it // TODO: Receive response diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java index fcf00c1cacc..37006d003b1 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java @@ -17,9 +17,8 @@ import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.generator.FrameGenerator; +import org.eclipse.jetty.websocket.generator.Generator; import org.eclipse.jetty.websocket.protocol.CloseInfo; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; @@ -98,6 +97,7 @@ public class WebSocketServletRFCTest } } + private static Generator generator = new UnitGenerator(); private static SimpleServletServer server; @BeforeClass @@ -137,15 +137,15 @@ public class WebSocketServletRFCTest WebSocketFrame bin; - bin = FrameBuilder.binary(buf1).fin(false).asFrame(); + bin = WebSocketFrame.binary(buf1).setFin(false); client.write(bin); // write buf1 (fin=false) - bin = FrameBuilder.continuation(buf2).fin(false).asFrame(); + bin = new WebSocketFrame(OpCode.CONTINUATION).setPayload(buf2).setFin(false); client.write(bin); // write buf2 (fin=false) - bin = FrameBuilder.continuation(buf3).fin(true).asFrame(); + bin = new WebSocketFrame(OpCode.CONTINUATION).setPayload(buf3).setFin(true); client.write(bin); // write buf3 (fin=true) @@ -158,9 +158,11 @@ public class WebSocketServletRFCTest int aaCount = 0; int bbCount = 0; int ccCount = 0; - byte echod[] = binmsg.getPayloadData(); - for (byte b : echod) + + ByteBuffer echod = binmsg.getPayload(); + while (echod.remaining() > 1) { + byte b = echod.get(); switch (b) { case (byte)0xAA: @@ -201,7 +203,7 @@ public class WebSocketServletRFCTest // Generate text frame String msg = "this is an echo ... cho ... ho ... o"; - client.write(FrameBuilder.text(msg).asFrame()); + client.write(WebSocketFrame.text(msg)); // Read frame (hopefully text frame) Queue frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); @@ -226,7 +228,7 @@ public class WebSocketServletRFCTest client.sendStandardRequest(); client.expectUpgradeResponse(); - client.write(FrameBuilder.text("Hello").asFrame()); + client.write(WebSocketFrame.text("Hello")); // now wait for the server to time out // should be 2 frames, the TextFrame echo, and then the Close on disconnect @@ -255,7 +257,7 @@ public class WebSocketServletRFCTest client.expectUpgradeResponse(); // Generate text frame - client.write(FrameBuilder.text("CRASH").asFrame()); + client.write(WebSocketFrame.text("CRASH")); // Read frame (hopefully close frame) Queue frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); @@ -285,9 +287,9 @@ public class WebSocketServletRFCTest int dataSize = 1024 * 100; byte buf[] = new byte[dataSize]; Arrays.fill(buf,(byte)0x44); - ByteBuffer bb = ByteBuffer.allocate(dataSize + FrameGenerator.OVERHEAD); - BufferUtil.clearToFill(bb); - FrameBuilder.binary(buf).fin(true).fill(bb); + + WebSocketFrame bin = WebSocketFrame.binary(buf).setFin(true); + ByteBuffer bb = generator.generate(bin); BufferUtil.flipToFlush(bb,0); client.writeRaw(bb); @@ -319,9 +321,9 @@ public class WebSocketServletRFCTest int dataSize = 1024 * 100; byte buf[] = new byte[dataSize]; Arrays.fill(buf,(byte)'z'); - ByteBuffer bb = ByteBuffer.allocate(dataSize + FrameGenerator.OVERHEAD); - BufferUtil.clearToFill(bb); - FrameBuilder.text().payload(buf).fin(true).fill(bb); + + WebSocketFrame text = WebSocketFrame.text().setPayload(buf).setFin(true); + ByteBuffer bb = generator.generate(text); BufferUtil.flipToFlush(bb,0); client.writeRaw(bb); @@ -350,9 +352,9 @@ public class WebSocketServletRFCTest byte buf[] = new byte[] { (byte)0xC3, 0x28 }; - ByteBuffer bb = ByteBuffer.allocate(buf.length + FrameGenerator.OVERHEAD); - BufferUtil.clearToFill(bb); - FrameBuilder.text().payload(buf).fin(true).fill(bb); + + WebSocketFrame txt = WebSocketFrame.text().setPayload(buf); + ByteBuffer bb = generator.generate(txt); BufferUtil.flipToFlush(bb,0); client.writeRaw(bb); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java index 175b57d59eb..98954f4a21c 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java @@ -7,16 +7,19 @@ import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.generator.FrameGenerator; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.generator.Generator; import org.eclipse.jetty.websocket.protocol.CloseInfo; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; +import org.eclipse.jetty.websocket.server.ByteBufferAssert; import org.eclipse.jetty.websocket.server.SimpleServletServer; import org.eclipse.jetty.websocket.server.WebSocketServerFactory; import org.eclipse.jetty.websocket.server.WebSocketServlet; @@ -70,6 +73,16 @@ public class TestABCase5 private static SimpleServletServer server; + private static Generator laxGenerator; + + @BeforeClass + public static void initGenerators() + { + WebSocketPolicy policy = WebSocketPolicy.newServerPolicy(); + ByteBufferPool bufferPool = new StandardByteBufferPool(); + laxGenerator = new Generator(policy,bufferPool,false); + } + @BeforeClass public static void startServer() throws Exception { @@ -93,7 +106,7 @@ public class TestABCase5 client.sendStandardRequest(); client.expectUpgradeResponse(); - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); String fragment1 = "fragment1"; @@ -108,7 +121,7 @@ public class TestABCase5 client.writeRaw(buf); - ByteBuffer buf2 = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf2 = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf2); String fragment2 = "fragment2"; @@ -147,13 +160,14 @@ public class TestABCase5 client.expectUpgradeResponse(); String fragment1 = "fragment1"; - ByteBuffer frame1 = FrameBuilder.ping().fin(false).payload(fragment1.getBytes()).asByteBuffer(); - - client.writeRaw(frame1); + WebSocketFrame frame1 = WebSocketFrame.ping().setFin(false).setPayload(fragment1); + ByteBuffer buf1 = laxGenerator.generate(frame1); + client.writeRaw(buf1); String fragment2 = "fragment2"; - ByteBuffer frame2 = FrameBuilder.ping().payload(fragment2.getBytes()).asByteBuffer(); - client.writeRaw(frame2); + WebSocketFrame frame2 = WebSocketFrame.ping().setPayload(fragment2); + ByteBuffer buf2 = laxGenerator.generate(frame2); + client.writeRaw(buf2); // Read frame Queue frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); @@ -179,7 +193,7 @@ public class TestABCase5 client.sendStandardRequest(); client.expectUpgradeResponse(); - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); String fragment1 = "fragment1"; @@ -194,7 +208,7 @@ public class TestABCase5 client.writeRaw(buf); - ByteBuffer buf2 = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf2 = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf2); String fragment2 = "fragment2"; @@ -233,23 +247,20 @@ public class TestABCase5 client.expectUpgradeResponse(); String fragment1 = "fragment1"; - - ByteBuffer frame1 = FrameBuilder.pong().fin(false).payload(fragment1.getBytes()).asByteBuffer(); - - client.writeRaw(frame1); + WebSocketFrame frame1 = WebSocketFrame.pong().setFin(false).setPayload(fragment1); + ByteBuffer buf1 = laxGenerator.generate(frame1); + client.writeRaw(buf1); String fragment2 = "fragment2"; - - ByteBuffer frame2 = FrameBuilder.continuation().fin(false).payload(fragment2.getBytes()).asByteBuffer(); - - client.writeRaw(frame2); + WebSocketFrame frame2 = new WebSocketFrame(OpCode.CONTINUATION).setFin(false).setPayload(fragment2); + ByteBuffer buf2 = laxGenerator.generate(frame2); + client.writeRaw(buf2); // Read frame Queue frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); WebSocketFrame frame = frames.remove(); Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE)); - Assert.assertThat("CloseFrame.status code",new CloseInfo(frame).getStatusCode(),is(1002)); } finally @@ -259,7 +270,6 @@ public class TestABCase5 } @Test - @Ignore("not supported in implementation yet, requires server side message aggregation") public void testCase5_3TextIn2Packets() throws Exception { BlockheadClient client = new BlockheadClient(server.getServerUri()); @@ -269,7 +279,7 @@ public class TestABCase5 client.sendStandardRequest(); client.expectUpgradeResponse(); - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); String fragment1 = "fragment1"; @@ -284,7 +294,7 @@ public class TestABCase5 client.writeRaw(buf); - ByteBuffer buf2 = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf2 = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf2); String fragment2 = "fragment2"; @@ -313,7 +323,6 @@ public class TestABCase5 } @Test - @Ignore("not supported in implementation yet, requires server side message aggregation") public void testCase5_6TextPingRemainingText() throws Exception { BlockheadClient client = new BlockheadClient(server.getServerUri()); @@ -325,7 +334,7 @@ public class TestABCase5 // Send a text packet - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); String fragment1 = "fragment1"; @@ -342,7 +351,7 @@ public class TestABCase5 // Send a ping with payload - ByteBuffer pingBuf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer pingBuf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(pingBuf); String pingPayload = "ping payload"; @@ -359,7 +368,7 @@ public class TestABCase5 // Send remaining text as continuation - ByteBuffer buf2 = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf2 = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf2); String fragment2 = "fragment2"; @@ -382,12 +391,10 @@ public class TestABCase5 ByteBuffer payload1 = ByteBuffer.allocate(pingPayload.length()); payload1.flip(); - Assert.assertArrayEquals("payloads should be equal",BufferUtil.toArray(payload1),frame.getPayloadData()); - + ByteBufferAssert.assertEquals("payloads should be equal",payload1,frame.getPayload()); frame = frames.remove(); Assert.assertThat("second frame should be text frame",frame.getOpCode(),is(OpCode.TEXT)); - Assert.assertThat("TextFrame.payload",frame.getPayloadAsUTF8(),is(fragment1 + fragment2)); } finally @@ -397,7 +404,6 @@ public class TestABCase5 } @Test - @Ignore("not supported in implementation yet, requires server side message aggregation") public void testCase5_6TextPingRemainingTextWithBuilder() throws Exception { BlockheadClient client = new BlockheadClient(server.getServerUri()); @@ -409,26 +415,21 @@ public class TestABCase5 // Send a text packet String textPayload1 = "fragment1"; - - ByteBuffer frame1 = FrameBuilder.text().fin(false).payload(textPayload1.getBytes()).asByteBuffer(); - BufferUtil.flipToFlush(frame1,0); - client.writeRaw(frame1); + WebSocketFrame frame1 = WebSocketFrame.text().setFin(false).setPayload(textPayload1); + ByteBuffer buf1 = laxGenerator.generate(frame1); + client.writeRaw(buf1); // Send a ping with payload - String pingPayload = "ping payload"; - ByteBuffer frame2 = FrameBuilder.ping().payload(pingPayload.getBytes()).asByteBuffer(); - BufferUtil.flipToFlush(frame2,0); - - client.writeRaw(frame2); + WebSocketFrame frame2 = WebSocketFrame.ping().setPayload(pingPayload); + ByteBuffer buf2 = laxGenerator.generate(frame2); + client.writeRaw(buf2); // Send remaining text as continuation String textPayload2 = "fragment2"; - - ByteBuffer frame3 = FrameBuilder.continuation().payload(textPayload2.getBytes()).asByteBuffer(); - BufferUtil.flipToFlush(frame3,0); - - client.writeRaw(frame3); + WebSocketFrame frame3 = new WebSocketFrame(OpCode.CONTINUATION).setPayload(textPayload2); + ByteBuffer buf3 = laxGenerator.generate(frame3); + client.writeRaw(buf3); // Should be 2 frames, pong frame followed by combined echo'd text frame Queue frames = client.readFrames(2,TimeUnit.MILLISECONDS,500); @@ -439,7 +440,7 @@ public class TestABCase5 ByteBuffer payload1 = ByteBuffer.allocate(pingPayload.length()); payload1.flip(); - Assert.assertArrayEquals("payloads should be equal",BufferUtil.toArray(payload1),frame.getPayloadData()); + ByteBufferAssert.assertEquals("Payload",payload1,frame.getPayload()); frame = frames.remove(); @@ -466,7 +467,7 @@ public class TestABCase5 // Send a text packet - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); String fragment1 = "fragment"; diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java index c8b563818a0..211bd7db65c 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java @@ -16,9 +16,8 @@ import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.generator.FrameGenerator; +import org.eclipse.jetty.websocket.generator.Generator; import org.eclipse.jetty.websocket.protocol.CloseInfo; -import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.server.SimpleServletServer; @@ -145,7 +144,7 @@ public class TestABCase7_9 client.sendStandardRequest(); client.expectUpgradeResponse(); - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); // Create Close Frame manually, as we are testing the server's behavior of a bad client. @@ -178,7 +177,7 @@ public class TestABCase7_9 public void testCase7_9_XInvalidCloseStatusCodesWithReason() throws Exception { String reason = "closing time"; - + BlockheadClient client = new BlockheadClient(server.getServerUri()); try { @@ -186,15 +185,12 @@ public class TestABCase7_9 client.sendStandardRequest(); client.expectUpgradeResponse(); - ByteBuffer frame = FrameBuilder.close().mask(new byte[] - { 0x44, 0x44, 0x44, 0x44 }).asByteBuffer(); - - ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); + ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + 2); BufferUtil.clearToFill(buf); // Create Close Frame manually, as we are testing the server's behavior of a bad client. buf.put((byte)(0x80 | OpCode.CLOSE.getCode())); - buf.put((byte)(0x80 | 2 + reason.length())); + buf.put((byte)(0x80 | (2 + reason.length()))); byte mask[] = new byte[] { 0x44, 0x44, 0x44, 0x44 }; buf.put(mask); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java index a9d729753c1..81d4ced0fd9 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.generator.Generator; import org.eclipse.jetty.websocket.parser.Parser; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; +import org.eclipse.jetty.websocket.server.UnitGenerator; import org.junit.Assert; /** @@ -83,7 +84,7 @@ public class BlockheadClient implements Parser.Listener policy = WebSocketPolicy.newClientPolicy(); bufferPool = new StandardByteBufferPool(policy.getBufferSize()); - generator = new Generator(policy); + generator = new UnitGenerator(); parser = new Parser(policy); parser.addListener(this); @@ -353,19 +354,10 @@ public class BlockheadClient implements Parser.Listener public void write(WebSocketFrame frame) throws IOException { LOG.debug("write(Frame->{})",frame); - ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false); - try - { - frame.setMask(clientmask); - BufferUtil.flipToFill(buf); - generator.generate(buf,frame); - BufferUtil.flipToFlush(buf,0); - BufferUtil.writeTo(buf,out); - } - finally - { - bufferPool.release(buf); - } + frame.setMask(clientmask); + ByteBuffer buf = generator.generate(frame); + BufferUtil.flipToFlush(buf,0); + BufferUtil.writeTo(buf,out); } public void writeRaw(ByteBuffer buf) throws IOException diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java index bf0afcd18f0..0fbba711ab7 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java @@ -3,9 +3,9 @@ package org.eclipse.jetty.websocket.server.examples.echo; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.api.io.WebSocketPing; @WebSocket public class EchoBroadcastPingSocket extends EchoBroadcastSocket @@ -13,11 +13,11 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket private static class KeepAlive extends Thread { private CountDownLatch latch; - private WebSocketPing pinger; + private WebSocketConnection conn; public KeepAlive(WebSocketConnection conn) { - this.pinger = new WebSocketPing(conn); + this.conn = conn; } @Override @@ -27,10 +27,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket { while (!latch.await(10,TimeUnit.SECONDS)) { - System.err.println("Ping " + pinger); + System.err.println("Ping"); byte data[] = new byte[] { (byte)1, (byte)2, (byte)3 }; - pinger.sendPing(data); + conn.ping(null,new FutureCallback(),data); } } catch (Exception e) diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java index 40ad4fb928f..7947dc53d25 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java @@ -1,10 +1,11 @@ package org.eclipse.jetty.websocket.server.examples.echo; import java.io.IOException; +import java.nio.ByteBuffer; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; -import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.WebSocketConnection; @@ -24,8 +25,15 @@ public class EchoFragmentSocket return; } - byte data[] = frame.getPayloadData(); - int half = data.length / 2; + ByteBuffer data = frame.getPayload(); + + int half = data.remaining() / 2; + + ByteBuffer buf1 = data.slice(); + ByteBuffer buf2 = data.slice(); + + buf1.limit(half); + buf2.position(half); Callback nop = new FutureCallback<>(); try @@ -33,12 +41,13 @@ public class EchoFragmentSocket switch (frame.getOpCode()) { case BINARY: - conn.write(null,nop,data,0,half); - conn.write(null,nop,data,half,data.length - half); + conn.write(null,nop,buf1); + conn.write(null,nop,buf2); break; case TEXT: - conn.write(null,nop,new String(data,0,half,StringUtil.__UTF8_CHARSET)); - conn.write(null,nop,new String(data,half,data.length - half,StringUtil.__UTF8_CHARSET)); + // NOTE: This impl is not smart enough to split on a UTF8 boundary + conn.write(null,nop,BufferUtil.toUTF8String(buf1)); + conn.write(null,nop,BufferUtil.toUTF8String(buf2)); break; } }