From 01c9e650fba6840fdcbc035fe2d289bd0c3f8028 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Wed, 31 Oct 2012 11:07:05 -0700 Subject: [PATCH] First working TEXT echo thru the muxer (as test case) --- .../core/extensions/mux/MuxChannel.java | 2 + .../core/extensions/mux/MuxGenerator.java | 2 + .../websocket/core/extensions/mux/Muxer.java | 50 +++++++++++++++---- .../core/extensions/mux/add/MuxAddServer.java | 23 +++++++-- .../mux/op/MuxAddChannelRequest.java | 10 ++++ .../mux/op/MuxAddChannelResponse.java | 6 +++ .../core/examples/echo/AdapterEchoSocket.java | 6 +++ .../core/extensions/mux/MuxEventCapture.java | 19 +++++++ .../core/extensions/mux/MuxInjector.java | 6 ++- .../core/extensions/mux/MuxReducer.java | 1 + .../extensions/mux/add/DummyMuxAddServer.java | 50 +++++++++++-------- .../mux/add/MuxerAddServerTest.java | 14 +++++- .../core/io/LocalWebSocketConnection.java | 11 +++- 13 files changed, 162 insertions(+), 38 deletions(-) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxChannel.java index 1f95e2e8427..fed481e733d 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxChannel.java @@ -254,6 +254,8 @@ public class MuxChannel implements WebSocketConnection, IncomingFrames, Outgoing public void setSession(WebSocketSession session) { this.session = session; + this.incoming = session; + session.setOutgoing(this); } public void setSubProtocol(String subProtocol) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxGenerator.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxGenerator.java index 6d2ae15f7b6..2950d091c9e 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxGenerator.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxGenerator.java @@ -154,6 +154,7 @@ public class MuxGenerator public void output(C context, Callback callback, long channelId, WebSocketFrame frame) throws IOException { ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false); + BufferUtil.flipToFill(muxPayload); // start building mux payload writeChannelId(muxPayload,channelId); @@ -167,6 +168,7 @@ public class MuxGenerator // build muxed frame WebSocketFrame muxFrame = WebSocketFrame.binary(); + BufferUtil.flipToFlush(muxPayload,0); muxFrame.setPayload(muxPayload); // NOTE: the physical connection will handle masking rules for this frame. diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/Muxer.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/Muxer.java index 626acee4f52..64e09293164 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/Muxer.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/Muxer.java @@ -21,8 +21,10 @@ package org.eclipse.jetty.websocket.core.extensions.mux; import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; @@ -71,6 +73,10 @@ public class Muxer implements IncomingFrames, MuxParser.Listener private MuxGenerator generator; private MuxAddServer addServer; private MuxAddClient addClient; + /** The original request headers, used for delta encoded AddChannelRequest blocks */ + private List physicalRequestHeaders; + /** The original response headers, used for delta encoded AddChannelResponse blocks */ + private List physicalResponseHeaders; public Muxer(final WebSocketConnection connection, final OutgoingFrames outgoing) { @@ -135,6 +141,12 @@ public class Muxer implements IncomingFrames, MuxParser.Listener return physicalConnection.isOpen(); } + public String mergeHeaders(List physicalHeaders, String deltaHeaders) + { + // TODO Auto-generated method stub + return null; + } + /** * Per spec, the physical connection must be failed. *

@@ -194,17 +206,27 @@ public class Muxer implements IncomingFrames, MuxParser.Listener // submit to upgrade handshake process try { - MuxAddChannelResponse response = addServer.handshake(physicalConnection,request); - if (response == null) + String requestHandshake = BufferUtil.toUTF8String(request.getHandshake()); + if (request.isDeltaEncoded()) { - LOG.warn("AddChannelResponse is null"); - // no upgrade possible? - response = new MuxAddChannelResponse(); - response.setChannelId(request.getChannelId()); - response.setFailed(true); + // Merge original request headers out of physical connection. + requestHandshake = mergeHeaders(physicalRequestHeaders,requestHandshake); + } + String responseHandshake = addServer.handshake(channel,requestHandshake); + if (StringUtil.isNotBlank(responseHandshake)) + { + // Upgrade Success + MuxAddChannelResponse response = new MuxAddChannelResponse(); + response.setChannelId(request.getChannelId()); + response.setFailed(false); + response.setHandshake(responseHandshake); + // send response + this.generator.generate(response); + } + else + { + // TODO: trigger error? } - // send response - this.generator.generate(response); } catch (Throwable t) { @@ -369,6 +391,10 @@ public class Muxer implements IncomingFrames, MuxParser.Listener */ public void output(C context, Callback callback, long channelId, WebSocketFrame frame) throws IOException { + if (LOG.isDebugEnabled()) + { + LOG.debug("output({}, {}, {}, {})",context,callback,channelId,frame); + } generator.output(context,callback,channelId,frame); } @@ -394,4 +420,10 @@ public class Muxer implements IncomingFrames, MuxParser.Listener { this.remoteAddress = remoteAddress; } + + @Override + public String toString() + { + return String.format("Muxer[subChannels.size=%d]", channels.size()); + } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxAddServer.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxAddServer.java index 16161630b6d..06a31135f81 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxAddServer.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxAddServer.java @@ -20,14 +20,27 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add; import java.io.IOException; -import org.eclipse.jetty.websocket.core.api.WebSocketConnection; -import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest; -import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse; +import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel; +import org.eclipse.jetty.websocket.core.extensions.mux.MuxException; +import org.eclipse.jetty.websocket.core.io.WebSocketSession; /** - * Server interface, for dealing with incoming AddChannelRequest and posting of AddChannelResponse back. + * Server interface, for dealing with incoming AddChannelRequest / AddChannelResponse flows. */ public interface MuxAddServer { - MuxAddChannelResponse handshake(WebSocketConnection physicalConnection, MuxAddChannelRequest request) throws IOException; + /** + * Perform the handshake. + * + * @param channel + * the channel to attach the {@link WebSocketSession} to. + * @param requestHandshake + * the request handshake (request headers) + * @return the response handshake (the response headers) + * @throws MuxException + * if unable to handshake + * @throws IOException + * if unable to parse request headers + */ + String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException; } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelRequest.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelRequest.java index ecba1697307..06374224df7 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelRequest.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelRequest.java @@ -65,6 +65,16 @@ public class MuxAddChannelRequest implements MuxControlBlock return rsv; } + public boolean isDeltaEncoded() + { + return (enc == 1); + } + + public boolean isIdentityEncoded() + { + return (enc == 0); + } + public void setChannelId(long channelId) { this.channelId = channelId; diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelResponse.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelResponse.java index 9f86fdcd46b..ca8e70f521d 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelResponse.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/extensions/mux/op/MuxAddChannelResponse.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.core.extensions.mux.op; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock; import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp; @@ -98,6 +99,11 @@ public class MuxAddChannelResponse implements MuxControlBlock } } + public void setHandshake(String responseHandshake) + { + setHandshake(BufferUtil.toBuffer(responseHandshake)); + } + public void setRsv(byte rsv) { this.rsv = rsv; diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/examples/echo/AdapterEchoSocket.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/examples/echo/AdapterEchoSocket.java index e6ac3699868..8981f653e0c 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/examples/echo/AdapterEchoSocket.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/examples/echo/AdapterEchoSocket.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.core.examples.echo; import java.io.IOException; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.api.WebSocketAdapter; /** @@ -27,16 +29,20 @@ import org.eclipse.jetty.websocket.core.api.WebSocketAdapter; */ public class AdapterEchoSocket extends WebSocketAdapter { + private static final Logger LOG = Log.getLogger(AdapterEchoSocket.class); + @Override public void onWebSocketText(String message) { if (isNotConnected()) { + LOG.debug("WebSocket Not Connected"); return; } try { + LOG.debug("Echoing back message [{}]",message); // echo the data back getBlockingConnection().write(message); } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxEventCapture.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxEventCapture.java index b6e1e7da15b..6c3cf12a71d 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxEventCapture.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxEventCapture.java @@ -29,6 +29,7 @@ import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxDropChannel; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxFlowControl; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxNewChannelSlot; +import org.eclipse.jetty.websocket.core.protocol.OpCode; import org.junit.Assert; public class MuxEventCapture implements MuxParser.Listener @@ -44,6 +45,24 @@ public class MuxEventCapture implements MuxParser.Listener Assert.assertThat("Frame Count",frames.size(), is(expected)); } + public void assertHasFrame(byte opcode, long channelId, int expectedCount) + { + int actualCount = 0; + + for (MuxedFrame frame : frames) + { + if (frame.getChannelId() == channelId) + { + if (frame.getOpCode() == opcode) + { + actualCount++; + } + } + } + + Assert.assertThat("Expected Count of " + OpCode.name(opcode) + " frames on Channel ID " + channelId,actualCount,is(expectedCount)); + } + public void assertHasOp(byte opCode, int expectedCount) { int actualCount = 0; diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxInjector.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxInjector.java index ef13fed5c93..d3d9170e3fe 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxInjector.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxInjector.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.websocket.core.extensions.mux; import java.io.IOException; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.io.IncomingFrames; import org.eclipse.jetty.websocket.core.io.OutgoingFrames; import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame; @@ -32,6 +34,7 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame; */ public class MuxInjector implements OutgoingFrames { + private static final Logger LOG = Log.getLogger(MuxInjector.class); private IncomingFrames incoming; private MuxGenerator generator; @@ -42,7 +45,7 @@ public class MuxInjector implements OutgoingFrames this.generator.setOutgoing(this); } - public void op(long channelId, WebSocketFrame frame) throws IOException + public void frame(long channelId, WebSocketFrame frame) throws IOException { this.generator.generate(channelId,frame); } @@ -55,6 +58,7 @@ public class MuxInjector implements OutgoingFrames @Override public void output(C context, Callback callback, WebSocketFrame frame) throws IOException { + LOG.debug("Injecting {} to {}",frame,incoming); this.incoming.incoming(frame); } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxReducer.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxReducer.java index 6a9b3bd5396..f9f46766c26 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxReducer.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/MuxReducer.java @@ -43,5 +43,6 @@ public class MuxReducer extends MuxEventCapture implements OutgoingFrames public void output(C context, Callback callback, WebSocketFrame frame) throws IOException { parser.parse(frame); + callback.completed(context); // let blocked calls know the send is complete. } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/DummyMuxAddServer.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/DummyMuxAddServer.java index ee7a0fe4486..4c5c8599189 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/DummyMuxAddServer.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/DummyMuxAddServer.java @@ -19,42 +19,52 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add; import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.core.api.WebSocketConnection; -import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest; -import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse; +import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.core.examples.echo.AdapterEchoSocket; +import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel; +import org.eclipse.jetty.websocket.core.extensions.mux.MuxException; +import org.eclipse.jetty.websocket.core.io.WebSocketSession; +import org.eclipse.jetty.websocket.core.io.event.EventDriver; +import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory; /** * Dummy impl of MuxAddServer */ public class DummyMuxAddServer implements MuxAddServer { + @SuppressWarnings("unused") private static final Logger LOG = Log.getLogger(DummyMuxAddServer.class); + private AdapterEchoSocket echo; + private WebSocketPolicy policy; + private EventDriverFactory eventDriverFactory; + + public DummyMuxAddServer() + { + this.policy = WebSocketPolicy.newServerPolicy(); + this.eventDriverFactory = new EventDriverFactory(policy); + this.echo = new AdapterEchoSocket(); + } @Override - public MuxAddChannelResponse handshake(WebSocketConnection physicalConnection, MuxAddChannelRequest request) throws IOException + public String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException { - - MuxAddChannelResponse response = new MuxAddChannelResponse(); - response.setChannelId(request.getChannelId()); - response.setEnc((byte)0); - - StringBuilder hresp = new StringBuilder(); - hresp.append("HTTP/1.1 101 Switching Protocols\r\n"); - hresp.append("Connection: upgrade\r\n"); + StringBuilder response = new StringBuilder(); + response.append("HTTP/1.1 101 Switching Protocols\r\n"); + response.append("Connection: upgrade\r\n"); // not meaningful (per Draft 08) hresp.append("Upgrade: websocket\r\n"); // not meaningful (per Draft 08) hresp.append("Sec-WebSocket-Accept: Kgo85/8KVE8YPONSeyhgL3GwqhI=\r\n"); - hresp.append("\r\n"); + response.append("\r\n"); - ByteBuffer handshake = BufferUtil.toBuffer(hresp.toString()); - LOG.debug("Handshake: {}",BufferUtil.toDetailString(handshake)); + EventDriver websocket = this.eventDriverFactory.wrap(echo); + WebSocketSession session = new WebSocketSession(websocket,channel,channel.getPolicy(),"echo"); + channel.setSession(session); + channel.setSubProtocol("echo"); + channel.onOpen(); + session.onConnect(); - response.setHandshake(handshake); - - return response; + return response.toString(); } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxerAddServerTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxerAddServerTest.java index b2ba07356d7..fc8608c2b4a 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxerAddServerTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/extensions/mux/add/MuxerAddServerTest.java @@ -29,6 +29,8 @@ import org.eclipse.jetty.websocket.core.extensions.mux.Muxer; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse; import org.eclipse.jetty.websocket.core.io.LocalWebSocketConnection; +import org.eclipse.jetty.websocket.core.protocol.OpCode; +import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -44,16 +46,17 @@ public class MuxerAddServerTest { LocalWebSocketConnection physical = new LocalWebSocketConnection(testname); physical.setPolicy(WebSocketPolicy.newServerPolicy()); + physical.onOpen(); MuxReducer reducer = new MuxReducer(); + // Represents a server side muxer. Muxer muxer = new Muxer(physical,reducer); DummyMuxAddServer addServer = new DummyMuxAddServer(); muxer.setAddServer(addServer); MuxInjector inject = new MuxInjector(muxer); - // Trigger AddChannel StringBuilder request = new StringBuilder(); request.append("GET /echo HTTP/1.1\r\n"); @@ -71,11 +74,20 @@ public class MuxerAddServerTest inject.op(req); + // Make sure we got AddChannelResponse reducer.assertHasOp(MuxOp.ADD_CHANNEL_RESPONSE,1); MuxAddChannelResponse response = (MuxAddChannelResponse)reducer.getOps().pop(); Assert.assertThat("AddChannelResponse.channelId",response.getChannelId(),is(1L)); Assert.assertThat("AddChannelResponse.failed",response.isFailed(),is(false)); Assert.assertThat("AddChannelResponse.handshake",response.getHandshake(),notNullValue()); Assert.assertThat("AddChannelResponse.handshakeSize",response.getHandshakeSize(),is(57L)); + + reducer.reset(); + + // Send simple echo request + inject.frame(1,WebSocketFrame.text("Hello World")); + + // Test for echo response (is there a user echo websocket connected to the sub-channel?) + reducer.assertHasFrame(OpCode.TEXT,1L,1); } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/io/LocalWebSocketConnection.java index 6737ace5283..e69341c1892 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/io/LocalWebSocketConnection.java @@ -33,6 +33,7 @@ public class LocalWebSocketConnection implements WebSocketConnection { private final String id; private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy(); + private boolean open = false; public LocalWebSocketConnection() { @@ -52,16 +53,19 @@ public class LocalWebSocketConnection implements WebSocketConnection @Override public void close() { + open = false; } @Override public void close(int statusCode, String reason) { + open = false; } @Override public void disconnect() { + open = false; } @Override @@ -98,10 +102,9 @@ public class LocalWebSocketConnection implements WebSocketConnection @Override public boolean isOpen() { - return false; + return open; } - @Override public boolean isOutputClosed() { @@ -121,6 +124,10 @@ public class LocalWebSocketConnection implements WebSocketConnection // TODO Auto-generated method stub } + public void onOpen() { + open = true; + } + @Override public void ping(C context, Callback callback, byte[] payload) throws IOException {