Fixing mid-message ping/pong support
This commit is contained in:
parent
4aa54d244a
commit
99e7b9becc
|
@ -31,17 +31,17 @@ import org.eclipse.jetty.websocket.io.WebSocketSession;
|
|||
*/
|
||||
public class WebSocketBlockingConnection
|
||||
{
|
||||
private static class Blocker extends FutureCallback<Void>
|
||||
private static class Blocker extends FutureCallback<String>
|
||||
{
|
||||
@Override
|
||||
public void completed(Void context)
|
||||
public void completed(String context)
|
||||
{
|
||||
LOG.debug("completed({})",context);
|
||||
super.completed(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Void context, Throwable cause)
|
||||
public void failed(String context, Throwable cause)
|
||||
{
|
||||
LOG.debug("failed({},{})",context,cause);
|
||||
super.failed(context,cause);
|
||||
|
@ -55,6 +55,8 @@ public class WebSocketBlockingConnection
|
|||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(WebSocketBlockingConnection.class);
|
||||
private static final String CONTEXT_BINARY = "BLOCKING_BINARY";
|
||||
private static final String CONTEXT_TEXT = "BLOCKING_TEXT";
|
||||
private final WebSocketSession conn;
|
||||
|
||||
public WebSocketBlockingConnection(WebSocketConnection conn)
|
||||
|
@ -79,7 +81,7 @@ public class WebSocketBlockingConnection
|
|||
try
|
||||
{
|
||||
Blocker blocker = new Blocker();
|
||||
conn.write(null,blocker,data,offset,length);
|
||||
conn.write(CONTEXT_BINARY,blocker,data,offset,length);
|
||||
blocker.get(); // block till finished
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
|
@ -102,7 +104,7 @@ public class WebSocketBlockingConnection
|
|||
try
|
||||
{
|
||||
Blocker blocker = new Blocker();
|
||||
conn.write(null,blocker,message);
|
||||
conn.write(CONTEXT_TEXT,blocker,message);
|
||||
blocker.get(); // block till finished
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
|
|
|
@ -163,11 +163,19 @@ public class WebSocketEventDriver implements IncomingFrames
|
|||
}
|
||||
throw new CloseException(close.getStatusCode(),close.getReason());
|
||||
}
|
||||
case PONG:
|
||||
case PING:
|
||||
{
|
||||
WebSocketFrame pong = new WebSocketFrame(OpCode.PONG);
|
||||
pong.setPayload(frame.getPayload());
|
||||
connection.output(null,new FutureCallback<Void>(),pong);
|
||||
if (frame.getPayloadLength() > 0)
|
||||
{
|
||||
// Copy payload
|
||||
ByteBuffer pongBuf = ByteBuffer.allocate(frame.getPayloadLength());
|
||||
BufferUtil.clearToFill(pongBuf);
|
||||
BufferUtil.put(frame.getPayload(),pongBuf);
|
||||
BufferUtil.flipToFlush(pongBuf,0);
|
||||
pong.setPayload(pongBuf);
|
||||
}
|
||||
connection.output("pong",new FutureCallback<String>(),pong);
|
||||
break;
|
||||
}
|
||||
case BINARY:
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
|
|||
import org.eclipse.jetty.io.StandardByteBufferPool;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
|
@ -403,10 +404,9 @@ public class TestABCase5
|
|||
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.SECONDS,1);
|
||||
WebSocketFrame frame = frames.remove();
|
||||
|
||||
Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PING));
|
||||
Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG));
|
||||
|
||||
ByteBuffer payload1 = ByteBuffer.allocate(pingPayload.length());
|
||||
payload1.flip();
|
||||
ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET);
|
||||
|
||||
ByteBufferAssert.assertEquals("payloads should be equal",payload1,frame.getPayload());
|
||||
frame = frames.remove();
|
||||
|
@ -434,18 +434,21 @@ public class TestABCase5
|
|||
String textPayload1 = "fragment1";
|
||||
WebSocketFrame frame1 = WebSocketFrame.text().setFin(false).setPayload(textPayload1);
|
||||
ByteBuffer buf1 = laxGenerator.generate(frame1);
|
||||
BufferUtil.flipToFlush(buf1,0);
|
||||
client.writeRaw(buf1);
|
||||
|
||||
// Send a ping with payload
|
||||
String pingPayload = "ping payload";
|
||||
WebSocketFrame frame2 = WebSocketFrame.ping().setPayload(pingPayload);
|
||||
ByteBuffer buf2 = laxGenerator.generate(frame2);
|
||||
BufferUtil.flipToFlush(buf2,0);
|
||||
client.writeRaw(buf2);
|
||||
|
||||
// Send remaining text as continuation
|
||||
String textPayload2 = "fragment2";
|
||||
WebSocketFrame frame3 = new WebSocketFrame(OpCode.CONTINUATION).setPayload(textPayload2);
|
||||
ByteBuffer buf3 = laxGenerator.generate(frame3);
|
||||
BufferUtil.flipToFlush(buf3,0);
|
||||
client.writeRaw(buf3);
|
||||
|
||||
// Should be 2 frames, pong frame followed by combined echo'd text frame
|
||||
|
@ -454,9 +457,7 @@ public class TestABCase5
|
|||
|
||||
Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG));
|
||||
|
||||
ByteBuffer payload1 = ByteBuffer.allocate(pingPayload.length());
|
||||
payload1.flip();
|
||||
|
||||
ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET);
|
||||
ByteBufferAssert.assertEquals("Payload",payload1,frame.getPayload());
|
||||
|
||||
frame = frames.remove();
|
||||
|
|
|
@ -210,9 +210,10 @@ public class BlockheadClient implements IncomingFrames
|
|||
public void incoming(WebSocketFrame frame)
|
||||
{
|
||||
LOG.debug("incoming({})",frame);
|
||||
if (!incomingFrameQueue.offerLast(frame))
|
||||
WebSocketFrame copy = new WebSocketFrame(frame); // make a copy
|
||||
if (!incomingFrameQueue.offerLast(copy))
|
||||
{
|
||||
throw new RuntimeException("Unable to queue incoming frame: " + frame);
|
||||
throw new RuntimeException("Unable to queue incoming frame: " + copy);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
org.eclipse.jetty.io.LEVEL=DEBUG
|
||||
org.eclipse.jetty.io.LEVEL=INFO
|
||||
org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
|
||||
org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.generator.LEVEL=DEBUG
|
||||
|
|
Loading…
Reference in New Issue