diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java index 14625596c18..4862b947766 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java @@ -23,21 +23,21 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception; -import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.CloseException; -import org.eclipse.jetty.websocket.api.MessageTooLargeException; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketListener; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.io.IncomingFrames; -import org.eclipse.jetty.websocket.io.MessageInputStream; -import org.eclipse.jetty.websocket.io.MessageReader; -import org.eclipse.jetty.websocket.io.StreamAppender; import org.eclipse.jetty.websocket.io.WebSocketSession; +import org.eclipse.jetty.websocket.io.message.MessageAppender; +import org.eclipse.jetty.websocket.io.message.MessageInputStream; +import org.eclipse.jetty.websocket.io.message.MessageReader; +import org.eclipse.jetty.websocket.io.message.SimpleBinaryMessage; +import org.eclipse.jetty.websocket.io.message.SimpleTextMessage; import org.eclipse.jetty.websocket.protocol.CloseInfo; import org.eclipse.jetty.websocket.protocol.Frame; import org.eclipse.jetty.websocket.protocol.OpCode; @@ -60,8 +60,7 @@ public class WebSocketEventDriver implements IncomingFrames private final EventMethods events; private final ByteBufferPool bufferPool; private WebSocketSession session; - private ByteBuffer activeMessage; - private StreamAppender activeStream; + private MessageAppender activeMessage; /** * Establish the driver for the Websocket POJO @@ -100,25 +99,6 @@ public class WebSocketEventDriver implements IncomingFrames } } - private void appendBuffer(ByteBuffer msgBuf, ByteBuffer payloadBuf) - { - if (payloadBuf == null) - { - // nothing to do (empty payload is possible) - return; - } - if (msgBuf.remaining() < payloadBuf.remaining()) - { - if (LOG.isDebugEnabled()) - { - LOG.debug(" msgBuf = {}",BufferUtil.toDetailString(msgBuf)); - LOG.debug("payloadBuf = {}",BufferUtil.toDetailString(msgBuf)); - } - throw new MessageTooLargeException("Message exceeded maximum buffer size of [" + payloadBuf.capacity() + "]"); - } - msgBuf.put(payloadBuf); - } - public WebSocketPolicy getPolicy() { return policy; @@ -215,66 +195,25 @@ public class WebSocketEventDriver implements IncomingFrames // not interested in binary events return; } - if (events.onBinary.isStreaming()) + + if (activeMessage == null) { - boolean needsNotification = false; - - // Streaming Approach - if (activeStream == null) + if (events.onBinary.isStreaming()) { - // Allocate directly, not via ByteBufferPool, as this buffer - // is ultimately controlled by the end user, and we can't know - // when they are done using the stream in order to release any - // buffer allocated from the ByteBufferPool. - ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize()); - this.activeStream = new MessageInputStream(buf); - needsNotification = true; + activeMessage = new MessageInputStream(websocket,events.onBinary,session,bufferPool,policy); } - - activeStream.appendBuffer(frame.getPayload()); - - if (needsNotification) + else { - events.onBinary.call(websocket,session,activeStream); - } - - if (frame.isFin()) - { - // close the stream. - activeStream.bufferComplete(); - activeStream = null; // work with a new one + activeMessage = new SimpleBinaryMessage(websocket,events.onBinary,session,bufferPool,policy); } } - else + + activeMessage.appendMessage(frame.getPayload()); + + if (frame.isFin()) { - if (activeMessage == null) - { - // Acquire from ByteBufferPool is safe here, as the return - // from the notification is a good place to release the - // buffer. - activeMessage = bufferPool.acquire(policy.getBufferSize(),false); - BufferUtil.clearToFill(activeMessage); - } - - appendBuffer(activeMessage,frame.getPayload()); - - // normal case - if (frame.isFin()) - { - // Notify using simple message approach. - try - { - BufferUtil.flipToFlush(activeMessage,0); - byte buf[] = BufferUtil.toArray(activeMessage); - events.onBinary.call(websocket,session,buf,0,buf.length); - } - finally - { - bufferPool.release(activeMessage); - activeMessage = null; - } - } - + activeMessage.messageComplete(); + activeMessage = null; } return; } @@ -285,69 +224,30 @@ public class WebSocketEventDriver implements IncomingFrames // not interested in text events return; } - if (events.onText.isStreaming()) - { - boolean needsNotification = false; - // Streaming Approach - if (activeStream == null) + if (activeMessage == null) + { + if (events.onText.isStreaming()) { // Allocate directly, not via ByteBufferPool, as this buffer // is ultimately controlled by the end user, and we can't know // when they are done using the stream in order to release any // buffer allocated from the ByteBufferPool. ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize()); - this.activeStream = new MessageReader(buf); - needsNotification = true; + this.activeMessage = new MessageReader(buf); } - - activeStream.appendBuffer(frame.getPayload()); - - if (needsNotification) + else { - events.onText.call(websocket,session,activeStream); - } - - if (frame.isFin()) - { - // close the stream. - activeStream.bufferComplete(); - activeStream = null; // work with a new one + activeMessage = new SimpleTextMessage(websocket,events.onText,session,policy); } } - else + + activeMessage.appendMessage(frame.getPayload()); + + if (frame.isFin()) { - if (activeMessage == null) - { - // Acquire from ByteBufferPool is safe here, as the return - // from the notification is a good place to release the - // buffer. - activeMessage = bufferPool.acquire(policy.getBufferSize(),false); - BufferUtil.clearToFill(activeMessage); - } - - appendBuffer(activeMessage,frame.getPayload()); - - // normal case - if (frame.isFin()) - { - // Notify using simple message approach. - try - { - BufferUtil.flipToFlush(activeMessage,0); - byte data[] = BufferUtil.toArray(activeMessage); - Utf8StringBuilder utf = new Utf8StringBuilder(); - // TODO: FIX EVIL COPY - utf.append(data,0,data.length); - - events.onText.call(websocket,session,utf.toString()); - } - finally - { - bufferPool.release(activeMessage); - activeMessage = null; - } - } + activeMessage.messageComplete(); + activeMessage = null; } return; } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageInputStream.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageInputStream.java deleted file mode 100644 index 0bec1613dc6..00000000000 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageInputStream.java +++ /dev/null @@ -1,62 +0,0 @@ -// ======================================================================== -// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -//======================================================================== -package org.eclipse.jetty.websocket.io; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -import org.eclipse.jetty.util.BufferUtil; - -/** - * Support class for reading binary message data as an InputStream. - */ -public class MessageInputStream extends InputStream implements StreamAppender -{ - private final ByteBuffer buffer; - - public MessageInputStream(ByteBuffer buf) - { - BufferUtil.clearToFill(buf); - this.buffer = buf; - } - - @Override - public void appendBuffer(ByteBuffer buf) - { - // TODO Auto-generated method stub - } - - @Override - public void bufferComplete() throws IOException - { - // TODO Auto-generated method stub - - } - - @Override - public ByteBuffer getBuffer() - { - return buffer; - } - - @Override - public int read() throws IOException - { - // TODO Auto-generated method stub - return 0; - } -} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/StreamAppender.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/StreamAppender.java deleted file mode 100644 index 7f1f93f69f5..00000000000 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/StreamAppender.java +++ /dev/null @@ -1,28 +0,0 @@ -// ======================================================================== -// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -//======================================================================== -package org.eclipse.jetty.websocket.io; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public interface StreamAppender -{ - void appendBuffer(ByteBuffer byteBuffer); - - void bufferComplete() throws IOException; - - ByteBuffer getBuffer(); -} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java new file mode 100644 index 00000000000..d500f888a47 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java @@ -0,0 +1,11 @@ +package org.eclipse.jetty.websocket.io.message; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface MessageAppender +{ + abstract void appendMessage(ByteBuffer byteBuffer) throws IOException; + + abstract void messageComplete() throws IOException; +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java new file mode 100644 index 00000000000..08f22071d8f --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java @@ -0,0 +1,114 @@ +// ======================================================================== +// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +//======================================================================== +package org.eclipse.jetty.websocket.io.message; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.driver.EventMethod; +import org.eclipse.jetty.websocket.io.WebSocketSession; + +/** + * Support class for reading binary message data as an InputStream. + */ +public class MessageInputStream extends InputStream implements MessageAppender +{ + private final Object websocket; + private final EventMethod onEvent; + private final WebSocketSession session; + private final ByteBufferPool bufferPool; + private final WebSocketPolicy policy; + private final ByteBuffer buf; + private int size; + private boolean finished; + private boolean needsNotification = true; + + public MessageInputStream(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy) + { + this.websocket = websocket; + this.onEvent = onEvent; + this.session = session; + this.bufferPool = bufferPool; + this.policy = policy; + this.buf = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clearToFill(this.buf); + finished = false; + } + + @Override + public void appendMessage(ByteBuffer payload) throws IOException + { + if (finished) + { + throw new IOException("Cannot append to finished buffer"); + } + + if (payload == null) + { + // empty payload is valid + return; + } + + policy.assertValidBinaryMessageSize(size + payload.remaining()); + size += payload.remaining(); + + synchronized (buf) + { + // TODO: grow buffer till max binary message size? + BufferUtil.put(payload,buf); + } + + if (needsNotification) + { + needsNotification = true; + this.onEvent.call(websocket,session,this); + } + } + + @Override + public void close() throws IOException + { + super.close(); + this.bufferPool.release(this.buf); + } + + @Override + public void messageComplete() throws IOException + { + finished = true; + } + + @Override + public int read() throws IOException + { + synchronized (buf) + { + // FIXME: HACKITY HACK HACK HACK + // Should really use its own tracking of position, to avoid flipping the + // buffer between read/write + byte b = buf.get(); + if (buf.limit() <= (buf.capacity() - 5)) + { + buf.compact(); + } + return b; + } + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageReader.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java similarity index 84% rename from jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageReader.java rename to jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java index 7587e2d540d..1a49c5d7faf 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageReader.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java @@ -13,7 +13,7 @@ // // You may elect to redistribute this code under either of these licenses. //======================================================================== -package org.eclipse.jetty.websocket.io; +package org.eclipse.jetty.websocket.io.message; import java.io.IOException; import java.io.Reader; @@ -26,7 +26,7 @@ import org.eclipse.jetty.util.BufferUtil; *
* Due to the spec, this reader is forced to use the UTF8 charset. */ -public class MessageReader extends Reader implements StreamAppender +public class MessageReader extends Reader implements MessageAppender { private ByteBuffer buffer; @@ -37,14 +37,7 @@ public class MessageReader extends Reader implements StreamAppender } @Override - public void appendBuffer(ByteBuffer byteBuffer) - { - // TODO Auto-generated method stub - - } - - @Override - public void bufferComplete() throws IOException + public void appendMessage(ByteBuffer byteBuffer) throws IOException { // TODO Auto-generated method stub @@ -57,9 +50,10 @@ public class MessageReader extends Reader implements StreamAppender } @Override - public ByteBuffer getBuffer() + public void messageComplete() throws IOException { - return buffer; + // TODO Auto-generated method stub + } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java new file mode 100644 index 00000000000..e9ad1ddf0ac --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java @@ -0,0 +1,74 @@ +package org.eclipse.jetty.websocket.io.message; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.driver.EventMethod; +import org.eclipse.jetty.websocket.io.WebSocketSession; + +public class SimpleBinaryMessage implements MessageAppender +{ + private final Object websocket; + private final EventMethod onEvent; + private final WebSocketSession session; + private final ByteBufferPool bufferPool; + private final WebSocketPolicy policy; + private final ByteBuffer buf; + private int size; + private boolean finished; + + public SimpleBinaryMessage(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy) + { + this.websocket = websocket; + this.onEvent = onEvent; + this.session = session; + this.bufferPool = bufferPool; + this.policy = policy; + this.buf = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clearToFill(this.buf); + finished = false; + } + + @Override + public void appendMessage(ByteBuffer payload) throws IOException + { + if (finished) + { + throw new IOException("Cannot append to finished buffer"); + } + + if (payload == null) + { + // empty payload is valid + return; + } + + policy.assertValidBinaryMessageSize(size + payload.remaining()); + size += payload.remaining(); + + // TODO: grow buffer till max binary message size? + BufferUtil.put(payload,buf); + } + + @Override + public void messageComplete() throws IOException + { + BufferUtil.flipToFlush(this.buf,0); + finished = true; + + try + { + // notify event + byte data[] = BufferUtil.toArray(this.buf); + this.onEvent.call(websocket,session,data,0,data.length); + } + finally + { + // release buffer (we are done with it now) + bufferPool.release(this.buf); + } + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java new file mode 100644 index 00000000000..b0a1dc81be0 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java @@ -0,0 +1,61 @@ +package org.eclipse.jetty.websocket.io.message; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.driver.EventMethod; +import org.eclipse.jetty.websocket.io.WebSocketSession; + +public class SimpleTextMessage implements MessageAppender +{ + private final Object websocket; + private final EventMethod onEvent; + private final WebSocketSession session; + private final WebSocketPolicy policy; + private final Utf8StringBuilder utf; + private int size = 0; + private boolean finished; + + public SimpleTextMessage(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy) + { + this.websocket = websocket; + this.onEvent = onEvent; + this.session = session; + this.policy = policy; + this.utf = new Utf8StringBuilder(); + size = 0; + finished = false; + } + + @Override + public void appendMessage(ByteBuffer payload) throws IOException + { + if (finished) + { + throw new IOException("Cannot append to finished buffer"); + } + + if (payload == null) + { + // empty payload is valid + return; + } + + policy.assertValidTextMessageSize(size + payload.remaining()); + size += payload.remaining(); + + // allow for fast fail of BAD utf (incomplete utf will trigger on messageComplete) + this.utf.append(payload); + } + + @Override + public void messageComplete() throws IOException + { + finished = true; + + // notify event + this.onEvent.call(websocket,session,utf.toString()); + } +}