From 99e7b9becc0efc29f7ac275f22bdd3e74ed1f919 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 17 Jul 2012 11:50:53 -0700 Subject: [PATCH] Fixing mid-message ping/pong support --- .../api/io/WebSocketBlockingConnection.java | 12 +++++++----- .../websocket/driver/WebSocketEventDriver.java | 14 +++++++++++--- .../jetty/websocket/server/ab/TestABCase5.java | 13 +++++++------ .../server/blockhead/BlockheadClient.java | 5 +++-- .../src/test/resources/jetty-logging.properties | 2 +- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java index cc054759002..ba91a107eb2 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java @@ -31,17 +31,17 @@ import org.eclipse.jetty.websocket.io.WebSocketSession; */ public class WebSocketBlockingConnection { - private static class Blocker extends FutureCallback + private static class Blocker extends FutureCallback { @Override - public void completed(Void context) + public void completed(String context) { LOG.debug("completed({})",context); super.completed(context); } @Override - public void failed(Void context, Throwable cause) + public void failed(String context, Throwable cause) { LOG.debug("failed({},{})",context,cause); super.failed(context,cause); @@ -55,6 +55,8 @@ public class WebSocketBlockingConnection } private static final Logger LOG = Log.getLogger(WebSocketBlockingConnection.class); + private static final String CONTEXT_BINARY = "BLOCKING_BINARY"; + private static final String CONTEXT_TEXT = "BLOCKING_TEXT"; private final WebSocketSession conn; public WebSocketBlockingConnection(WebSocketConnection conn) @@ -79,7 +81,7 @@ public class WebSocketBlockingConnection try { Blocker blocker = new Blocker(); - conn.write(null,blocker,data,offset,length); + conn.write(CONTEXT_BINARY,blocker,data,offset,length); blocker.get(); // block till finished } catch (InterruptedException e) @@ -102,7 +104,7 @@ public class WebSocketBlockingConnection try { Blocker blocker = new Blocker(); - conn.write(null,blocker,message); + conn.write(CONTEXT_TEXT,blocker,message); blocker.get(); // block till finished } catch (InterruptedException e) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java index f29d2391d25..e81a31bc1eb 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java @@ -163,11 +163,19 @@ public class WebSocketEventDriver implements IncomingFrames } throw new CloseException(close.getStatusCode(),close.getReason()); } - case PONG: + case PING: { WebSocketFrame pong = new WebSocketFrame(OpCode.PONG); - pong.setPayload(frame.getPayload()); - connection.output(null,new FutureCallback(),pong); + if (frame.getPayloadLength() > 0) + { + // Copy payload + ByteBuffer pongBuf = ByteBuffer.allocate(frame.getPayloadLength()); + BufferUtil.clearToFill(pongBuf); + BufferUtil.put(frame.getPayload(),pongBuf); + BufferUtil.flipToFlush(pongBuf,0); + pong.setPayload(pongBuf); + } + connection.output("pong",new FutureCallback(),pong); break; } case BINARY: diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java index 65e3c41eb60..214827ab84b 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase5.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -403,10 +404,9 @@ public class TestABCase5 Queue frames = client.readFrames(2,TimeUnit.SECONDS,1); WebSocketFrame frame = frames.remove(); - Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PING)); + Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG)); - ByteBuffer payload1 = ByteBuffer.allocate(pingPayload.length()); - payload1.flip(); + ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET); ByteBufferAssert.assertEquals("payloads should be equal",payload1,frame.getPayload()); frame = frames.remove(); @@ -434,18 +434,21 @@ public class TestABCase5 String textPayload1 = "fragment1"; WebSocketFrame frame1 = WebSocketFrame.text().setFin(false).setPayload(textPayload1); ByteBuffer buf1 = laxGenerator.generate(frame1); + BufferUtil.flipToFlush(buf1,0); client.writeRaw(buf1); // Send a ping with payload String pingPayload = "ping payload"; WebSocketFrame frame2 = WebSocketFrame.ping().setPayload(pingPayload); ByteBuffer buf2 = laxGenerator.generate(frame2); + BufferUtil.flipToFlush(buf2,0); client.writeRaw(buf2); // Send remaining text as continuation String textPayload2 = "fragment2"; WebSocketFrame frame3 = new WebSocketFrame(OpCode.CONTINUATION).setPayload(textPayload2); ByteBuffer buf3 = laxGenerator.generate(frame3); + BufferUtil.flipToFlush(buf3,0); client.writeRaw(buf3); // Should be 2 frames, pong frame followed by combined echo'd text frame @@ -454,9 +457,7 @@ public class TestABCase5 Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG)); - ByteBuffer payload1 = ByteBuffer.allocate(pingPayload.length()); - payload1.flip(); - + ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET); ByteBufferAssert.assertEquals("Payload",payload1,frame.getPayload()); frame = frames.remove(); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java index 35e1a434bf8..93ae3baedc3 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java @@ -210,9 +210,10 @@ public class BlockheadClient implements IncomingFrames public void incoming(WebSocketFrame frame) { LOG.debug("incoming({})",frame); - if (!incomingFrameQueue.offerLast(frame)) + WebSocketFrame copy = new WebSocketFrame(frame); // make a copy + if (!incomingFrameQueue.offerLast(copy)) { - throw new RuntimeException("Unable to queue incoming frame: " + frame); + throw new RuntimeException("Unable to queue incoming frame: " + copy); } } diff --git a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties index 4e3678c550e..cf2b4d1d0d0 100644 --- a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties @@ -1,5 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.io.LEVEL=DEBUG +org.eclipse.jetty.io.LEVEL=INFO org.eclipse.jetty.io.SelectorManager.LEVEL=INFO org.eclipse.jetty.websocket.LEVEL=DEBUG org.eclipse.jetty.websocket.generator.LEVEL=DEBUG