Reworking stream vs simple message appending into single concept.

+ Removing StreamAppender
+ Adding MessageAppender
+ Gain fast-fail on bad UTF8 in text message
This commit is contained in:
Joakim Erdfelt 2012-08-07 12:16:53 -07:00
parent 28bc479417
commit 7ebab746fe
8 changed files with 296 additions and 232 deletions

View File

@ -23,21 +23,21 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.MessageInputStream;
import org.eclipse.jetty.websocket.io.MessageReader;
import org.eclipse.jetty.websocket.io.StreamAppender;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.io.message.MessageAppender;
import org.eclipse.jetty.websocket.io.message.MessageInputStream;
import org.eclipse.jetty.websocket.io.message.MessageReader;
import org.eclipse.jetty.websocket.io.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.io.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.Frame;
import org.eclipse.jetty.websocket.protocol.OpCode;
@ -60,8 +60,7 @@ public class WebSocketEventDriver implements IncomingFrames
private final EventMethods events;
private final ByteBufferPool bufferPool;
private WebSocketSession session;
private ByteBuffer activeMessage;
private StreamAppender activeStream;
private MessageAppender activeMessage;
/**
* Establish the driver for the Websocket POJO
@ -100,25 +99,6 @@ public class WebSocketEventDriver implements IncomingFrames
}
}
private void appendBuffer(ByteBuffer msgBuf, ByteBuffer payloadBuf)
{
if (payloadBuf == null)
{
// nothing to do (empty payload is possible)
return;
}
if (msgBuf.remaining() < payloadBuf.remaining())
{
if (LOG.isDebugEnabled())
{
LOG.debug(" msgBuf = {}",BufferUtil.toDetailString(msgBuf));
LOG.debug("payloadBuf = {}",BufferUtil.toDetailString(msgBuf));
}
throw new MessageTooLargeException("Message exceeded maximum buffer size of [" + payloadBuf.capacity() + "]");
}
msgBuf.put(payloadBuf);
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -215,67 +195,26 @@ public class WebSocketEventDriver implements IncomingFrames
// not interested in binary events
return;
}
if (activeMessage == null)
{
if (events.onBinary.isStreaming())
{
boolean needsNotification = false;
// Streaming Approach
if (activeStream == null)
{
// Allocate directly, not via ByteBufferPool, as this buffer
// is ultimately controlled by the end user, and we can't know
// when they are done using the stream in order to release any
// buffer allocated from the ByteBufferPool.
ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize());
this.activeStream = new MessageInputStream(buf);
needsNotification = true;
}
activeStream.appendBuffer(frame.getPayload());
if (needsNotification)
{
events.onBinary.call(websocket,session,activeStream);
}
if (frame.isFin())
{
// close the stream.
activeStream.bufferComplete();
activeStream = null; // work with a new one
}
activeMessage = new MessageInputStream(websocket,events.onBinary,session,bufferPool,policy);
}
else
{
if (activeMessage == null)
{
// Acquire from ByteBufferPool is safe here, as the return
// from the notification is a good place to release the
// buffer.
activeMessage = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(activeMessage);
activeMessage = new SimpleBinaryMessage(websocket,events.onBinary,session,bufferPool,policy);
}
}
appendBuffer(activeMessage,frame.getPayload());
activeMessage.appendMessage(frame.getPayload());
// normal case
if (frame.isFin())
{
// Notify using simple message approach.
try
{
BufferUtil.flipToFlush(activeMessage,0);
byte buf[] = BufferUtil.toArray(activeMessage);
events.onBinary.call(websocket,session,buf,0,buf.length);
}
finally
{
bufferPool.release(activeMessage);
activeMessage.messageComplete();
activeMessage = null;
}
}
}
return;
}
case OpCode.TEXT:
@ -285,70 +224,31 @@ public class WebSocketEventDriver implements IncomingFrames
// not interested in text events
return;
}
if (events.onText.isStreaming())
{
boolean needsNotification = false;
// Streaming Approach
if (activeStream == null)
if (activeMessage == null)
{
if (events.onText.isStreaming())
{
// Allocate directly, not via ByteBufferPool, as this buffer
// is ultimately controlled by the end user, and we can't know
// when they are done using the stream in order to release any
// buffer allocated from the ByteBufferPool.
ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize());
this.activeStream = new MessageReader(buf);
needsNotification = true;
}
activeStream.appendBuffer(frame.getPayload());
if (needsNotification)
{
events.onText.call(websocket,session,activeStream);
}
if (frame.isFin())
{
// close the stream.
activeStream.bufferComplete();
activeStream = null; // work with a new one
}
this.activeMessage = new MessageReader(buf);
}
else
{
if (activeMessage == null)
{
// Acquire from ByteBufferPool is safe here, as the return
// from the notification is a good place to release the
// buffer.
activeMessage = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(activeMessage);
activeMessage = new SimpleTextMessage(websocket,events.onText,session,policy);
}
}
appendBuffer(activeMessage,frame.getPayload());
activeMessage.appendMessage(frame.getPayload());
// normal case
if (frame.isFin())
{
// Notify using simple message approach.
try
{
BufferUtil.flipToFlush(activeMessage,0);
byte data[] = BufferUtil.toArray(activeMessage);
Utf8StringBuilder utf = new Utf8StringBuilder();
// TODO: FIX EVIL COPY
utf.append(data,0,data.length);
events.onText.call(websocket,session,utf.toString());
}
finally
{
bufferPool.release(activeMessage);
activeMessage.messageComplete();
activeMessage = null;
}
}
}
return;
}
}

View File

@ -1,62 +0,0 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
/**
* Support class for reading binary message data as an InputStream.
*/
public class MessageInputStream extends InputStream implements StreamAppender
{
private final ByteBuffer buffer;
public MessageInputStream(ByteBuffer buf)
{
BufferUtil.clearToFill(buf);
this.buffer = buf;
}
@Override
public void appendBuffer(ByteBuffer buf)
{
// TODO Auto-generated method stub
}
@Override
public void bufferComplete() throws IOException
{
// TODO Auto-generated method stub
}
@Override
public ByteBuffer getBuffer()
{
return buffer;
}
@Override
public int read() throws IOException
{
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -1,28 +0,0 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface StreamAppender
{
void appendBuffer(ByteBuffer byteBuffer);
void bufferComplete() throws IOException;
ByteBuffer getBuffer();
}

View File

@ -0,0 +1,11 @@
package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface MessageAppender
{
abstract void appendMessage(ByteBuffer byteBuffer) throws IOException;
abstract void messageComplete() throws IOException;
}

View File

@ -0,0 +1,114 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethod;
import org.eclipse.jetty.websocket.io.WebSocketSession;
/**
* Support class for reading binary message data as an InputStream.
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final ByteBuffer buf;
private int size;
private boolean finished;
private boolean needsNotification = true;
public MessageInputStream(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.bufferPool = bufferPool;
this.policy = policy;
this.buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(this.buf);
finished = false;
}
@Override
public void appendMessage(ByteBuffer payload) throws IOException
{
if (finished)
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
// empty payload is valid
return;
}
policy.assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
synchronized (buf)
{
// TODO: grow buffer till max binary message size?
BufferUtil.put(payload,buf);
}
if (needsNotification)
{
needsNotification = true;
this.onEvent.call(websocket,session,this);
}
}
@Override
public void close() throws IOException
{
super.close();
this.bufferPool.release(this.buf);
}
@Override
public void messageComplete() throws IOException
{
finished = true;
}
@Override
public int read() throws IOException
{
synchronized (buf)
{
// FIXME: HACKITY HACK HACK HACK
// Should really use its own tracking of position, to avoid flipping the
// buffer between read/write
byte b = buf.get();
if (buf.limit() <= (buf.capacity() - 5))
{
buf.compact();
}
return b;
}
}
}

View File

@ -13,7 +13,7 @@
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io;
package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.io.Reader;
@ -26,7 +26,7 @@ import org.eclipse.jetty.util.BufferUtil;
* <p>
* Due to the spec, this reader is forced to use the UTF8 charset.
*/
public class MessageReader extends Reader implements StreamAppender
public class MessageReader extends Reader implements MessageAppender
{
private ByteBuffer buffer;
@ -37,14 +37,7 @@ public class MessageReader extends Reader implements StreamAppender
}
@Override
public void appendBuffer(ByteBuffer byteBuffer)
{
// TODO Auto-generated method stub
}
@Override
public void bufferComplete() throws IOException
public void appendMessage(ByteBuffer byteBuffer) throws IOException
{
// TODO Auto-generated method stub
@ -57,9 +50,10 @@ public class MessageReader extends Reader implements StreamAppender
}
@Override
public ByteBuffer getBuffer()
public void messageComplete() throws IOException
{
return buffer;
// TODO Auto-generated method stub
}
@Override

View File

@ -0,0 +1,74 @@
package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethod;
import org.eclipse.jetty.websocket.io.WebSocketSession;
public class SimpleBinaryMessage implements MessageAppender
{
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final ByteBuffer buf;
private int size;
private boolean finished;
public SimpleBinaryMessage(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.bufferPool = bufferPool;
this.policy = policy;
this.buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(this.buf);
finished = false;
}
@Override
public void appendMessage(ByteBuffer payload) throws IOException
{
if (finished)
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
// empty payload is valid
return;
}
policy.assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
// TODO: grow buffer till max binary message size?
BufferUtil.put(payload,buf);
}
@Override
public void messageComplete() throws IOException
{
BufferUtil.flipToFlush(this.buf,0);
finished = true;
try
{
// notify event
byte data[] = BufferUtil.toArray(this.buf);
this.onEvent.call(websocket,session,data,0,data.length);
}
finally
{
// release buffer (we are done with it now)
bufferPool.release(this.buf);
}
}
}

View File

@ -0,0 +1,61 @@
package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethod;
import org.eclipse.jetty.websocket.io.WebSocketSession;
public class SimpleTextMessage implements MessageAppender
{
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final WebSocketPolicy policy;
private final Utf8StringBuilder utf;
private int size = 0;
private boolean finished;
public SimpleTextMessage(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.policy = policy;
this.utf = new Utf8StringBuilder();
size = 0;
finished = false;
}
@Override
public void appendMessage(ByteBuffer payload) throws IOException
{
if (finished)
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
// empty payload is valid
return;
}
policy.assertValidTextMessageSize(size + payload.remaining());
size += payload.remaining();
// allow for fast fail of BAD utf (incomplete utf will trigger on messageComplete)
this.utf.append(payload);
}
@Override
public void messageComplete() throws IOException
{
finished = true;
// notify event
this.onEvent.call(websocket,session,utf.toString());
}
}