From 6c0e24485ab8952899f0def692ae36a79f0dd68a Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Mon, 16 Jul 2012 15:49:12 -0700 Subject: [PATCH] Splitting out new WebSocketSession from WebSocketAsyncConnection to better support extensions (see diagrams) --- .../jetty/websocket/api/Extension.java | 15 ++- .../api/io/WebSocketBlockingConnection.java | 7 +- .../driver/WebSocketEventDriver.java | 2 +- .../deflate/DeflateFrameExtension.java | 13 +- .../fragment/FragmentExtension.java | 12 +- .../jetty/websocket/io/ControlFrameBytes.java | 2 +- .../jetty/websocket/io/DataFrameBytes.java | 2 +- .../jetty/websocket/io/FrameBytes.java | 4 +- .../jetty/websocket/io/OutgoingFrames.java | 3 +- .../jetty/websocket/io/RawConnection.java | 30 +---- .../io/WebSocketAsyncConnection.java | 123 +++--------------- .../jetty/websocket/io/WebSocketSession.java | 115 ++++++++++++++++ .../io/LocalWebSocketConnection.java | 54 +------- 13 files changed, 177 insertions(+), 205 deletions(-) create mode 100644 jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/Extension.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/Extension.java index 803a8decada..18df0999eab 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/Extension.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/Extension.java @@ -16,6 +16,8 @@ package org.eclipse.jetty.websocket.api; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.websocket.io.IncomingFrames; import org.eclipse.jetty.websocket.io.OutgoingFrames; import org.eclipse.jetty.websocket.protocol.ExtensionConfig; @@ -106,16 +108,21 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames * @param frame * the frame to send to the next output */ - public void nextOutput(WebSocketFrame frame) + public void nextOutput(C context, Callback callback, WebSocketFrame frame) { - nextOutgoingFrames.output(frame); + nextOutgoingFrames.output(context,callback,frame); + } + + public void nextOutputNoCallback(WebSocketFrame frame) + { + nextOutgoingFrames.output(null,new FutureCallback(),frame); } @Override - public void output(WebSocketFrame frame) + public void output(C context, Callback callback, WebSocketFrame frame) { // pass thru, un-modified - nextOutgoingFrames.output(frame); + nextOutgoingFrames.output(context,callback,frame); } public void setBufferPool(ByteBufferPool bufferPool) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java index c0ea8f74fe9..7d278aa8bcf 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketBlockingConnection.java @@ -20,7 +20,6 @@ import java.util.concurrent.ExecutionException; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.io.DataFrameBytes; import org.eclipse.jetty.websocket.io.RawConnection; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; @@ -56,8 +55,7 @@ public class WebSocketBlockingConnection try { FutureCallback blocking = new FutureCallback<>(); - DataFrameBytes bytes = new DataFrameBytes<>(conn,blocking,null,frame); - this.conn.getQueue().append(bytes); + conn.output(null,blocking,frame); blocking.get(); // block till finished } catch (InterruptedException e) @@ -81,8 +79,7 @@ public class WebSocketBlockingConnection try { FutureCallback blocking = new FutureCallback<>(); - DataFrameBytes bytes = new DataFrameBytes<>(conn,blocking,null,frame); - this.conn.getQueue().append(bytes); + conn.output(null,blocking,frame); blocking.get(); // block till finished } catch (InterruptedException e) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java index 2c6ee8be659..2d21972720b 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java @@ -167,7 +167,7 @@ public class WebSocketEventDriver implements IncomingFrames { WebSocketFrame pong = new WebSocketFrame(OpCode.PONG); pong.setPayload(frame.getPayload()); - connection.write(null,new FutureCallback(),pong); + connection.output(null,new FutureCallback(),pong); break; } case BINARY: diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/deflate/DeflateFrameExtension.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/deflate/DeflateFrameExtension.java index c15f8d2308a..37ab51fd0da 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/deflate/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/deflate/DeflateFrameExtension.java @@ -21,6 +21,7 @@ import java.util.zip.Deflater; import java.util.zip.Inflater; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BadPayloadException; @@ -65,8 +66,8 @@ public class DeflateFrameExtension extends Extension case PONG: if (len > WebSocketFrame.MAX_CONTROL_PAYLOAD) { - throw new ProtocolException("Invalid control frame payload length, [" + len + "] cannot exceed [" - + WebSocketFrame.MAX_CONTROL_PAYLOAD + "]"); + throw new ProtocolException("Invalid control frame payload length, [" + len + "] cannot exceed [" + WebSocketFrame.MAX_CONTROL_PAYLOAD + + "]"); } break; } @@ -122,19 +123,19 @@ public class DeflateFrameExtension extends Extension } @Override - public void output(WebSocketFrame frame) + public void output(C context, Callback callback, WebSocketFrame frame) { if (frame.getOpCode().isControlFrame()) { // skip, cannot compress control frames. - super.output(frame); + nextOutput(context,callback,frame); return; } if (frame.getPayloadLength() < minLength) { // skip, frame too small to care compressing it. - super.output(frame); + nextOutput(context,callback,frame); return; } @@ -178,7 +179,7 @@ public class DeflateFrameExtension extends Extension frame.setPayload(out); frame.setRsv1(deflater.finished()); - nextOutput(frame); + nextOutput(context,callback,frame); // free original data buffer getBufferPool().release(data); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/fragment/FragmentExtension.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/fragment/FragmentExtension.java index 32571dc66cb..d26eb2324f4 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/fragment/FragmentExtension.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/extensions/fragment/FragmentExtension.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.websocket.extensions.fragment; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.api.Extension; import org.eclipse.jetty.websocket.protocol.ExtensionConfig; import org.eclipse.jetty.websocket.protocol.OpCode; @@ -27,13 +28,14 @@ public class FragmentExtension extends Extension private int maxLength = -1; private int minFragments = 1; + @Override - public void output(WebSocketFrame frame) + public void output(C context, Callback callback, WebSocketFrame frame) { if (frame.getOpCode().isControlFrame()) { // Cannot fragment Control Frames - getNextOutgoingFrames().output(frame); + nextOutput(context,callback,frame); return; } @@ -57,7 +59,7 @@ public class FragmentExtension extends Extension payload.limit(Math.min(payload.limit() + maxLength,originalLimit)); frag.setPayload(payload); - nextOutput(frag); + nextOutputNoCallback(frag); length -= maxLength; opcode = OpCode.CONTINUATION; @@ -79,7 +81,7 @@ public class FragmentExtension extends Extension frag.setFin(false); frag.setPayload(payload); - nextOutput(frag); + nextOutputNoCallback(frag); length -= fragLength; opcode = OpCode.CONTINUATION; } @@ -91,7 +93,7 @@ public class FragmentExtension extends Extension payload.limit(originalLimit); frag.setPayload(payload); - nextOutput(frag); + nextOutput(context,callback,frag); } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/ControlFrameBytes.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/ControlFrameBytes.java index ade7a98b0cb..fbbbde079f2 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/ControlFrameBytes.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/ControlFrameBytes.java @@ -26,7 +26,7 @@ public class ControlFrameBytes extends FrameBytes { private ByteBuffer buffer; - public ControlFrameBytes(RawConnection connection, Callback callback, C context, WebSocketFrame frame) + public ControlFrameBytes(WebSocketAsyncConnection connection, Callback callback, C context, WebSocketFrame frame) { super(connection,callback,context,frame); } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java index 22c611a8380..5a43f07e97a 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java @@ -26,7 +26,7 @@ public class DataFrameBytes extends FrameBytes private int size; private ByteBuffer buffer; - public DataFrameBytes(RawConnection connection, Callback callback, C context, WebSocketFrame frame) + public DataFrameBytes(WebSocketAsyncConnection connection, Callback callback, C context, WebSocketFrame frame) { super(connection,callback,context,frame); } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/FrameBytes.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/FrameBytes.java index 67784ae30e1..c16874fb99b 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/FrameBytes.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/FrameBytes.java @@ -25,14 +25,14 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame; public abstract class FrameBytes implements Callback, Runnable { - protected final RawConnection connection; + protected final WebSocketAsyncConnection connection; protected final Callback callback; protected final C context; protected final WebSocketFrame frame; // Task used to timeout the bytes protected volatile ScheduledFuture task; - protected FrameBytes(RawConnection connection, Callback callback, C context, WebSocketFrame frame) + protected FrameBytes(WebSocketAsyncConnection connection, Callback callback, C context, WebSocketFrame frame) { this.connection = connection; this.callback = callback; diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/OutgoingFrames.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/OutgoingFrames.java index d9a7b0f2460..3f8c0b35d74 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/OutgoingFrames.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/OutgoingFrames.java @@ -1,5 +1,6 @@ package org.eclipse.jetty.websocket.io; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; /** @@ -7,5 +8,5 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame; */ public interface OutgoingFrames { - void output(WebSocketFrame frame); + void output(C context, Callback callback, WebSocketFrame frame); } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java index ab05e1bd53b..590d3193e83 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java @@ -16,42 +16,22 @@ package org.eclipse.jetty.websocket.io; import java.io.IOException; -import java.util.concurrent.Executor; - -import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.protocol.Generator; -import org.eclipse.jetty.websocket.protocol.Parser; -import org.eclipse.jetty.websocket.protocol.WebSocketFrame; +import java.net.InetSocketAddress; /** * Interface for working with connections in a raw way. *

* This is abstracted out to allow for common access to connection internals regardless of physical vs virtual connections. */ -public interface RawConnection extends WebSocketConnection +public interface RawConnection extends OutgoingFrames { void close() throws IOException; - void complete(FrameBytes frameBytes); + void close(int statusCode, String reason) throws IOException; void disconnect(boolean onlyOutput); - void flush(); + InetSocketAddress getRemoteAddress(); - ByteBufferPool getBufferPool(); - - Executor getExecutor(); - - Generator getGenerator(); - - Parser getParser(); - - WebSocketPolicy getPolicy(); - - FrameQueue getQueue(); - - void write(C context, Callback callback, WebSocketFrame frame); + boolean isOpen(); } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index ee664fb3817..5b0012b31a9 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -39,14 +39,13 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.protocol.CloseInfo; import org.eclipse.jetty.websocket.protocol.ExtensionConfig; import org.eclipse.jetty.websocket.protocol.Generator; -import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.Parser; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; /** * Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io */ -public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, WebSocketConnection, OutgoingFrames +public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames { static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class); private static final ThreadLocal CURRENT_CONNECTION = new ThreadLocal(); @@ -68,7 +67,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements private final WebSocketPolicy policy; private final FrameQueue queue; private List extensions; - private OutgoingFrames outgoingFramesHandler; private boolean flushing; public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) @@ -95,7 +93,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements terminateConnection(statusCode,reason); } - @Override public void complete(FrameBytes frameBytes) { synchronized (queue) @@ -134,7 +131,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements } } - @Override public void flush() { FrameBytes frameBytes = null; @@ -160,13 +156,11 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements write(buffer,this,frameBytes); } - @Override public ByteBufferPool getBufferPool() { return bufferPool; } - @Override public Executor getExecutor() { return getExecutor(); @@ -184,25 +178,21 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements return extensions; } - @Override public Generator getGenerator() { return generator; } - @Override public Parser getParser() { return parser; } - @Override public WebSocketPolicy getPolicy() { return this.policy; } - @Override public FrameQueue getQueue() { return queue; @@ -219,13 +209,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements return scheduler; } - @Override - public String getSubProtocol() - { - // TODO Auto-generated method stub - return null; - } - @Override public boolean isOpen() { @@ -268,19 +251,25 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements fillInterested(); } + /** + * Enqueue internal frame from {@link OutgoingFrames} stack for eventual write out on the physical connection. + */ @Override - public void output(WebSocketFrame frame) + public void output(C context, Callback callback, WebSocketFrame frame) { - // TODO Auto-generated method stub - } - - @Override - public void ping(C context, Callback callback, byte[] payload) throws IOException - { - WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload); - ControlFrameBytes bytes = new ControlFrameBytes(this,callback,context,frame); - scheduleTimeout(bytes); - queue.prepend(bytes); + if (frame.getOpCode().isControlFrame()) + { + ControlFrameBytes bytes = new ControlFrameBytes(this,callback,context,frame); + scheduleTimeout(bytes); + queue.prepend(bytes); + } + else + { + DataFrameBytes bytes = new DataFrameBytes(this,callback,context,frame); + scheduleTimeout(bytes); + queue.append(bytes); + } + flush(); } private void read(ByteBuffer buffer) @@ -354,80 +343,4 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements LOG.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes); getEndPoint().write(frameBytes.context,frameBytes,buffer); } - - /** - * {@inheritDoc} - */ - @Override - public void write(C context, Callback callback, byte buf[], int offset, int len) throws IOException - { - if (LOG.isDebugEnabled()) - { - LOG.debug("write(context,{},byte[],{},{})",callback,offset,len); - } - if (len == 0) - { - // nothing to write - return; - } - - WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len); - DataFrameBytes bytes = new DataFrameBytes(this,callback,context,frame); - scheduleTimeout(bytes); - queue.append(bytes); - flush(); - } - - /** - * {@inheritDoc} - */ - @Override - public void write(C context, Callback callback, ByteBuffer buffer) throws IOException - { - if (LOG.isDebugEnabled()) - { - LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer)); - } - - WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer); - DataFrameBytes bytes = new DataFrameBytes(this,callback,context,frame); - scheduleTimeout(bytes); - queue.append(bytes); - flush(); - } - - /** - * {@inheritDoc} - */ - @Override - public void write(C context, Callback callback, String message) throws IOException - { - if (LOG.isDebugEnabled()) - { - LOG.debug("write(context,{},message.length:{})",callback,message.length()); - } - - WebSocketFrame frame = WebSocketFrame.text(message); - DataFrameBytes bytes = new DataFrameBytes(this,callback,context,frame); - scheduleTimeout(bytes); - queue.append(bytes); - flush(); - } - - @Override - public void write(C context, Callback callback, WebSocketFrame frame) - { - if (frame.getOpCode().isControlFrame()) - { - ControlFrameBytes bytes = new ControlFrameBytes(this,callback,context,frame); - scheduleTimeout(bytes); - queue.prepend(bytes); - } - else - { - DataFrameBytes bytes = new DataFrameBytes(this,callback,context,frame); - scheduleTimeout(bytes); - queue.append(bytes); - } - } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java new file mode 100644 index 00000000000..52d57461729 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java @@ -0,0 +1,115 @@ +package org.eclipse.jetty.websocket.io; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.WebSocketConnection; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.protocol.OpCode; +import org.eclipse.jetty.websocket.protocol.WebSocketFrame; + +public class WebSocketSession implements WebSocketConnection +{ + private static final Logger LOG = Log.getLogger(WebSocketSession.class); + private RawConnection connection; + private OutgoingFrames outgoing; + private String subprotocol; + private WebSocketPolicy policy; + + @Override + public void close() throws IOException + { + connection.close(); + } + + @Override + public void close(int statusCode, String reason) throws IOException + { + connection.close(statusCode,reason); + } + + @Override + public WebSocketPolicy getPolicy() + { + return policy; + } + + @Override + public InetSocketAddress getRemoteAddress() + { + return connection.getRemoteAddress(); + } + + @Override + public String getSubProtocol() + { + return subprotocol; + } + + @Override + public boolean isOpen() + { + return connection.isOpen(); + } + + /** + * {@inheritDoc} + */ + @Override + public void ping(C context, Callback callback, byte[] payload) throws IOException + { + WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload); + frame.setFin(true); + outgoing.output(context,callback,frame); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(C context, Callback callback, byte[] buf, int offset, int len) throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("write(context,{},byte[],{},{})",callback,offset,len); + } + WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len); + frame.setFin(true); + outgoing.output(context,callback,frame); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(C context, Callback callback, ByteBuffer buffer) throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer)); + } + WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer); + frame.setFin(true); + outgoing.output(context,callback,frame); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(C context, Callback callback, String message) throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("write(context,{},message.length:{})",callback,message.length()); + } + WebSocketFrame frame = WebSocketFrame.text(message); + frame.setFin(true); + outgoing.output(context,callback,frame); + } +} diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java index 9d6c10dde48..c3471d80fa6 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java @@ -18,14 +18,10 @@ package org.eclipse.jetty.websocket.io; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.Executor; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.protocol.Generator; -import org.eclipse.jetty.websocket.protocol.Parser; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.junit.rules.TestName; @@ -58,57 +54,17 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect { } - @Override - public void complete(FrameBytes frameBytes) - { - } - @Override public void disconnect(boolean onlyOutput) { } - @Override - public void flush() - { - } - - @Override - public ByteBufferPool getBufferPool() - { - return null; - } - - @Override - public Executor getExecutor() - { - return null; - } - - @Override - public Generator getGenerator() - { - return null; - } - - @Override - public Parser getParser() - { - return null; - } - @Override public WebSocketPolicy getPolicy() { return null; } - @Override - public FrameQueue getQueue() - { - return null; - } - @Override public InetSocketAddress getRemoteAddress() { @@ -127,6 +83,11 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect return false; } + @Override + public void output(C context, Callback callback, WebSocketFrame frame) + { + } + @Override public void ping(C context, Callback callback, byte[] payload) throws IOException { @@ -152,9 +113,4 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect public void write(C context, Callback callback, String message) throws IOException { } - - @Override - public void write(C context, Callback callback, WebSocketFrame frame) - { - } }