From 3ae9548c43a213d7ddc80c623e7a15a5261ddf04 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Mon, 9 Jul 2012 14:42:59 -0700 Subject: [PATCH] Another review refact (in progress) --- .../websocket/api/WebSocketConnection.java | 9 + .../jetty/websocket/api/io/WebSocketPing.java | 3 +- .../jetty/websocket/generator/Generator.java | 20 +- .../io/WebSocketAsyncConnection.java | 201 +++++++++++++++--- .../jetty/websocket/parser/FrameParser.java | 1 + .../jetty/websocket/protocol/Frame.java | 5 + .../websocket/protocol/WebSocketFrame.java | 7 + 7 files changed, 219 insertions(+), 27 deletions(-) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java index 7713c142840..3fc34f832c6 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java @@ -65,6 +65,15 @@ public interface WebSocketConnection */ boolean isOpen(); + /** + * Send a single ping messages. + *

+ * NIO style with callbacks, allows for knowledge of successful ping send. + *

+ * Use @OnWebSocketFrame and monitor Pong frames + */ + void ping(C context, Callback callback, byte payload[]) throws IOException; + /** * Send a a binary message. *

diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java index e10978ebe3f..523a0cb8976 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java @@ -13,9 +13,10 @@ import org.eclipse.jetty.websocket.io.RawConnection; import org.eclipse.jetty.websocket.protocol.FrameBuilder; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; +@Deprecated public class WebSocketPing { - private RawConnection conn; + private Raw Connection conn; private ByteBufferPool bufferPool; private WebSocketPolicy policy; private Generator generator; diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/generator/Generator.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/generator/Generator.java index 4cf3545fdd3..258e310d70e 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/generator/Generator.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/generator/Generator.java @@ -2,6 +2,7 @@ package org.eclipse.jetty.websocket.generator; import java.nio.ByteBuffer; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -38,9 +39,9 @@ public class Generator private final FrameGenerator basicGenerator; - public Generator(WebSocketPolicy policy) + public Generator(WebSocketPolicy policy, ByteBufferPool bufferPool) { - basicGenerator = new FrameGenerator(policy); + basicGenerator = new FrameGenerator(policy,bufferPool); } public ByteBuffer generate(ByteBuffer buffer, WebSocketFrame frame) @@ -57,6 +58,21 @@ public class Generator return ret; } + public ByteBuffer generate(int size, WebSocketFrame frame) + { + // NEW ONE - allocate and fill only to param size + // note: size is the buffer size to generate into, not related to the payload size + // note: size cannot be less than framing overhead + // TODO: update frame.setAvailable() to indicate how much of payload is left to be generated + return basicGenerator.generate(frame); + } + + public ByteBuffer generate(WebSocketFrame frame) + { + // NEW ONE - allocate as much as needed + return basicGenerator.generate(frame); + } + @Override public String toString() { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index 0aeaf6dcf53..8196dcfb9c4 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -3,15 +3,17 @@ package org.eclipse.jetty.websocket.io; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.InterruptedByTimeoutException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import org.eclipse.jetty.io.AbstractAsyncConnection; import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -25,6 +27,7 @@ import org.eclipse.jetty.websocket.generator.Generator; import org.eclipse.jetty.websocket.parser.Parser; import org.eclipse.jetty.websocket.protocol.ExtensionConfig; import org.eclipse.jetty.websocket.protocol.FrameBuilder; +import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; /** @@ -32,6 +35,148 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame; */ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, WebSocketConnection { + public class ControlFrameBytes extends FrameBytes + { + public ControlFrameBytes(C context, Callback callback, WebSocketFrame frame, ByteBuffer buffer) + { + super(context,callback,frame,buffer); + } + + @Override + public void completed(C context) { + bufferPool.release(buffer); + + super.completed(context); + + if(frame.getOpCode() == OpCode.CLOSE) + { + // TODO: close the connection (no packet) + } + } + + @Override + public ByteBuffer getByteBuffer() + { + return buffer; + } + } + + public class DataFrameBytes extends FrameBytes + { + private int size; + + public DataFrameBytes(C context, Callback callback, WebSocketFrame frame, ByteBuffer buffer) + { + super(context,callback,frame,buffer); + } + + @Override + public void completed(C context) + { + bufferPool.release(buffer); + + if (frame.available() > 0) + { + // We have written a frame out of this DataInfo, but there is more to write. + // We need to keep the correct ordering of frames, to avoid that another + // DataInfo for the same stream is written before this one is finished. + prepend(this); + } + else + { + super.completed(context); + } + } + + @Override + public ByteBuffer getByteBuffer() + { + try + { + int windowSize = policy.getBufferSize(); + // TODO: create a window size? + + size = frame.getPayloadLength(); + if (size > windowSize) + { + size = windowSize; + } + + buffer = generator.generate(size,frame); + return buffer; + } + catch (Throwable x) + { + failed(context,x); + return null; + } + } + } + + public abstract class FrameBytes implements Callback, Runnable + { + private final Callback callback; + protected final C context; + protected final WebSocketFrame frame; + protected final ByteBuffer buffer; + // Task used to timeout the bytes + protected volatile ScheduledFuture task; + + protected FrameBytes(C context, Callback callback, WebSocketFrame frame, ByteBuffer buffer) + { + this.callback = callback; + this.context = context; + this.frame = frame; + this.buffer = buffer; + } + + private void cancelTask() + { + ScheduledFuture task = this.task; + if (task != null) + { + task.cancel(false); + } + } + + @Override + public void completed(C context) + { + cancelTask(); + callback.completed(context); + } + + @Override + public void failed(C context, Throwable x) + { + cancelTask(); + callback.failed(context,x); + } + + public abstract ByteBuffer getByteBuffer(); + + @Override + public void run() + { + // If this occurs we had a timeout! + try + { + close(); + } + catch (IOException e) + { + LOG.ignore(e); + } + failed(context, new InterruptedByTimeoutException()); + } + + @Override + public String toString() + { + return frame.toString(); + } + } + private static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class); private static final ThreadLocal CURRENT_CONNECTION = new ThreadLocal(); @@ -46,21 +191,24 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements } private final ByteBufferPool bufferPool; - private Generator generator; - private Parser parser; - private WebSocketPolicy policy; + private final ScheduledExecutorService scheduler; + private final Generator generator; + private final Parser parser; + private final WebSocketPolicy policy; + // TODO: track extensions? (only those that need to operate at this level?) // TODO: are extensions going to layer the endpoint? // TODO: are extensions going to layer the connection? private List extensions; - public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, WebSocketPolicy policy, ByteBufferPool bufferPool) + public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) { super(endp,executor); this.policy = policy; this.bufferPool = bufferPool; this.generator = new Generator(policy); this.parser = new Parser(policy); + this.scheduler = scheduler; this.extensions = new ArrayList<>(); } @@ -85,7 +233,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements catch (IOException e) { terminateConnection(StatusCode.PROTOCOL,e.getMessage()); - throw new RuntimeIOException(e); + return 0; } } @@ -156,13 +304,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements LOG.debug("onFillable"); setCurrentConnection(this); ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false); - BufferUtil.clear(buffer); + BufferUtil.clearToFill(buffer); try { read(buffer); } finally { + // TODO: does fillInterested need to be called again? + fillInterested(); setCurrentConnection(null); bufferPool.release(buffer); } @@ -176,6 +326,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements fillInterested(); } + @Override + public void ping(C context, Callback callback, byte[] payload) throws IOException + { + WebSocketFrame frame = FrameBuilder.ping().payload(payload).asFrame(); + ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false); + generator.generate(buf,frame); + writeRaw(context,callback,buf); + } + private void read(ByteBuffer buffer) { while (true) @@ -264,28 +423,22 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements * {@inheritDoc} */ @Override - public void write(C context, Callback callback, String... messages) throws IOException + public void write(C context, Callback callback, String message) throws IOException { - int len = messages.length; - if (len == 0) - { - // nothing to write - return; - } if (LOG.isDebugEnabled()) { - LOG.debug("write(context,{},Strings->{})",callback,messages.length); + LOG.debug("write(context,{},message.length:{})",callback,message.length()); } - ByteBuffer raw[] = new ByteBuffer[messages.length]; - for (int i = 0; i < len; i++) - { - WebSocketFrame frame = FrameBuilder.text(messages[i]).fin(true).asFrame(); - raw[i] = bufferPool.acquire(policy.getBufferSize(),false); - BufferUtil.clearToFill(raw[i]); - generator.generate(raw[i],frame); - BufferUtil.flipToFlush(raw[i],0); - } + WebSocketFrame frame = FrameBuilder.text(message).fin(true).asFrame(); + FrameBytes bytes = new FrameBytes(callback,context,frame,buffer); + + WebSocketFrame frame = FrameBuilder.text(messages[i]).fin(true).asFrame(); + + raw[i] = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clearToFill(raw[i]); + generator.generate(raw[i],frame); + BufferUtil.flipToFlush(raw[i],0); writeRaw(context,callback,raw); } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/parser/FrameParser.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/parser/FrameParser.java index b57ada528c1..8955d2b99eb 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/parser/FrameParser.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/parser/FrameParser.java @@ -113,6 +113,7 @@ public class FrameParser if (frame.isMasked()) { // Demask the content 1 byte at a time + // FIXME: on partially parsed frames this needs an offset from prior parse byte mask[] = getFrame().getMask(); for (int i = 0; i < amt; i++) { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Frame.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Frame.java index 0bc8211635a..e82690246f1 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Frame.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Frame.java @@ -2,6 +2,8 @@ package org.eclipse.jetty.websocket.protocol; /** * The immutable frame details. + *

+ * Used by end user via @OnWebSocketFrame */ public interface Frame { @@ -9,6 +11,9 @@ public interface Frame public OpCode getOpCode(); + /** + * @return a copy of the payload data + */ public byte[] getPayloadData(); public int getPayloadLength(); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java index 6ee1ce73085..4f25038bfa9 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java @@ -1,6 +1,8 @@ package org.eclipse.jetty.websocket.protocol; +import java.nio.ByteBuffer; + import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.websocket.api.ProtocolException; @@ -135,6 +137,11 @@ public class WebSocketFrame implements Frame return opcode; } + public ByteBuffer getPayload() + { + return payload.slice(); + } + public String getPayloadAsUTF8() { if (payload == null)