428435 - Large streaming message fails in MessageWriter.

Fixed buffering, synchronization and callback notification in the
websocket stream implementation classes.
This commit is contained in:
Simone Bordet 2014-02-24 14:28:10 +01:00
parent d349d640c0
commit f465a13d6d
10 changed files with 653 additions and 286 deletions

View File

@ -23,7 +23,6 @@ import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
@ -103,7 +102,7 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E
if (activeMessage == null)
{
LOG.debug("Binary Message InputStream");
final MessageInputStream stream = new MessageInputStream(session.getConnection());
final MessageInputStream stream = new MessageInputStream();
activeMessage = stream;
// Always dispatch streaming read to another thread.
@ -311,7 +310,7 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E
{
LOG.debug("Text Message Writer");
final MessageReader stream = new MessageReader(new MessageInputStream(session.getConnection()));
final MessageReader stream = new MessageReader(new MessageInputStream());
activeMessage = stream;
// Always dispatch streaming read to another thread.

View File

@ -23,7 +23,6 @@ import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
@ -86,7 +85,7 @@ public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements Ev
}
else if (wrapper.wantsStreams())
{
final MessageInputStream stream = new MessageInputStream(session.getConnection());
final MessageInputStream stream = new MessageInputStream();
activeMessage = stream;
dispatch(new Runnable()
{
@ -181,7 +180,7 @@ public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements Ev
}
else if (wrapper.wantsStreams())
{
final MessageReader stream = new MessageReader(new MessageInputStream(session.getConnection()));
final MessageReader stream = new MessageReader(new MessageInputStream());
activeMessage = stream;
dispatch(new Runnable()

View File

@ -0,0 +1,177 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BinaryStreamTest
{
private static final String PATH = "/echo";
private Server server;
private ServerConnector connector;
private WebSocketContainer wsClient;
@Before
public void prepare() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
ServerContainer container = WebSocketServerContainerInitializer.configureContext(context);
ServerEndpointConfig config = ServerEndpointConfig.Builder.create(ServerBinaryStreamer.class, PATH).build();
container.addEndpoint(config);
server.start();
wsClient = ContainerProvider.getWebSocketContainer();
server.addBean(wsClient, true);
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testEchoWithMediumMessage() throws Exception
{
testEcho(1024);
}
@Test
public void testLargestMessage() throws Exception
{
testEcho(wsClient.getDefaultMaxBinaryMessageBufferSize());
}
private void testEcho(int size) throws Exception
{
byte[] data = randomBytes(size);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH);
ClientBinaryStreamer client = new ClientBinaryStreamer();
Session session = wsClient.connectToServer(client, uri);
try (OutputStream output = session.getBasicRemote().getSendStream())
{
output.write(data);
}
Assert.assertTrue(client.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, client.getEcho());
}
@Test
public void testMoreThanLargestMessageOneByteAtATime() throws Exception
{
int size = wsClient.getDefaultMaxBinaryMessageBufferSize() + 16;
byte[] data = randomBytes(size);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH);
ClientBinaryStreamer client = new ClientBinaryStreamer();
Session session = wsClient.connectToServer(client, uri);
try (OutputStream output = session.getBasicRemote().getSendStream())
{
for (int i = 0; i < size; ++i)
output.write(data[i]);
}
Assert.assertTrue(client.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, client.getEcho());
}
private byte[] randomBytes(int size)
{
byte[] data = new byte[size];
new Random().nextBytes(data);
return data;
}
@ClientEndpoint
public static class ClientBinaryStreamer
{
private final CountDownLatch latch = new CountDownLatch(1);
private final ByteArrayOutputStream output = new ByteArrayOutputStream();
@OnMessage
public void echoed(InputStream input) throws IOException
{
while (true)
{
int read = input.read();
if (read < 0)
break;
output.write(read);
}
latch.countDown();
}
public byte[] getEcho()
{
return output.toByteArray();
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
return latch.await(timeout, unit);
}
}
@ServerEndpoint(PATH)
public static class ServerBinaryStreamer
{
@OnMessage
public void echo(Session session, InputStream input) throws IOException
{
byte[] buffer = new byte[128];
try (OutputStream output = session.getBasicRemote().getSendStream())
{
int read;
while ((read = input.read(buffer)) >= 0)
output.write(buffer, 0, read);
}
}
}
}

View File

@ -0,0 +1,179 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TextStreamTest
{
private static final String PATH = "/echo";
private static final String CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
private Server server;
private ServerConnector connector;
private WebSocketContainer wsClient;
@Before
public void prepare() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
ServerContainer container = WebSocketServerContainerInitializer.configureContext(context);
ServerEndpointConfig config = ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build();
container.addEndpoint(config);
server.start();
wsClient = ContainerProvider.getWebSocketContainer();
server.addBean(wsClient, true);
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testEchoWithMediumMessage() throws Exception
{
testEcho(1024);
}
@Test
public void testLargestMessage() throws Exception
{
testEcho(wsClient.getDefaultMaxBinaryMessageBufferSize());
}
private void testEcho(int size) throws Exception
{
char[] data = randomChars(size);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH);
ClientTextStreamer client = new ClientTextStreamer();
Session session = wsClient.connectToServer(client, uri);
try (Writer output = session.getBasicRemote().getSendWriter())
{
output.write(data);
}
Assert.assertTrue(client.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, client.getEcho());
}
@Test
public void testMoreThanLargestMessageOneByteAtATime() throws Exception
{
int size = wsClient.getDefaultMaxBinaryMessageBufferSize() + 16;
char[] data = randomChars(size);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH);
ClientTextStreamer client = new ClientTextStreamer();
Session session = wsClient.connectToServer(client, uri);
try (Writer output = session.getBasicRemote().getSendWriter())
{
for (int i = 0; i < size; ++i)
output.write(data[i]);
}
Assert.assertTrue(client.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, client.getEcho());
}
private char[] randomChars(int size)
{
char[] data = new char[size];
Random random = new Random();
for (int i = 0; i < data.length; ++i)
data[i] = CHARS.charAt(random.nextInt(CHARS.length()));
return data;
}
@ClientEndpoint
public static class ClientTextStreamer
{
private final CountDownLatch latch = new CountDownLatch(1);
private final StringBuilder output = new StringBuilder();
@OnMessage
public void echoed(Reader input) throws IOException
{
while (true)
{
int read = input.read();
if (read < 0)
break;
output.append((char)read);
}
latch.countDown();
}
public char[] getEcho()
{
return output.toString().toCharArray();
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
return latch.await(timeout, unit);
}
}
@ServerEndpoint(PATH)
public static class ServerTextStreamer
{
@OnMessage
public void echo(Session session, Reader input) throws IOException
{
char[] buffer = new char[128];
try (Writer output = session.getBasicRemote().getSendWriter())
{
int read;
while ((read = input.read(buffer)) >= 0)
output.write(buffer, 0, read);
}
}
}
}

View File

@ -79,7 +79,7 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
{
if (events.onBinary.isStreaming())
{
activeMessage = new MessageInputStream(session.getConnection());
activeMessage = new MessageInputStream();
final MessageAppender msg = activeMessage;
dispatch(new Runnable()
{
@ -181,7 +181,7 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
{
if (events.onText.isStreaming())
{
activeMessage = new MessageReader(new MessageInputStream(session.getConnection()));
activeMessage = new MessageReader(new MessageInputStream());
final MessageAppender msg = activeMessage;
dispatch(new Runnable()
{

View File

@ -29,30 +29,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.LogicalConnection;
/**
* Support class for reading a (single) WebSocket BINARY message via a InputStream.
* <p>
* <p/>
* An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
private static final Logger LOG = Log.getLogger(MessageInputStream.class);
// EOF (End of Buffers)
private final static ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();
private static final ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private AtomicBoolean closed = new AtomicBoolean(false);
private final long timeoutMs;
private ByteBuffer activeBuffer = null;
public MessageInputStream(LogicalConnection connection)
public MessageInputStream()
{
this(connection, -1);
this(-1);
}
public MessageInputStream(LogicalConnection connection, int timeoutMs)
public MessageInputStream(int timeoutMs)
{
this.timeoutMs = timeoutMs;
}
@ -61,9 +59,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("appendMessage(ByteBuffer,{}): {}",fin,BufferUtil.toDetailString(framePayload));
}
LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload));
// If closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
@ -106,9 +102,9 @@ public class MessageInputStream extends InputStream implements MessageAppender
}
@Override
public synchronized void mark(int readlimit)
public void mark(int readlimit)
{
/* do nothing */
// Not supported.
}
@Override
@ -120,35 +116,34 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override
public void messageComplete()
{
LOG.debug("messageComplete()");
// toss an empty ByteBuffer into queue to let it drain
LOG.debug("Message completed");
buffers.offer(EOF);
}
@Override
public int read() throws IOException
{
LOG.debug("read()");
try
{
if (closed.get())
{
LOG.debug("Stream closed");
return -1;
}
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("Waiting {} ms to read", timeoutMs);
if (timeoutMs < 0)
{
// infinite take
// Wait forever until a buffer is available.
activeBuffer = buffers.take();
}
else
{
// timeout specific
// Wait at most for the given timeout.
activeBuffer = buffers.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (activeBuffer == null)
{
@ -158,24 +153,27 @@ public class MessageInputStream extends InputStream implements MessageAppender
if (activeBuffer == EOF)
{
LOG.debug("Reached EOF");
// Be sure that this stream cannot be reused.
closed.set(true);
// Removed buffers that may have remained in the queue.
buffers.clear();
return -1;
}
}
return activeBuffer.get();
return activeBuffer.get() & 0xFF;
}
catch (InterruptedException e)
catch (InterruptedException x)
{
LOG.warn(e);
LOG.debug("Interrupted while waiting to read", x);
closed.set(true);
return -1;
// throw new IOException(e);
}
}
@Override
public synchronized void reset() throws IOException
public void reset() throws IOException
{
throw new IOException("reset() not supported");
}

View File

@ -39,14 +39,20 @@ import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
public class MessageOutputStream extends OutputStream
{
private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
private final OutgoingFrames outgoing;
private final ByteBufferPool bufferPool;
private final BlockingWriteCallback blocker;
private long frameCount = 0;
private long frameCount;
private BinaryFrame frame;
private ByteBuffer buffer;
private WriteCallback callback;
private boolean closed = false;
private boolean closed;
public MessageOutputStream(WebSocketSession session)
{
this(session.getOutgoingHandler(), session.getPolicy().getMaxBinaryMessageBufferSize(), session.getBufferPool());
}
public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
{
@ -58,164 +64,152 @@ public class MessageOutputStream extends OutputStream
this.frame = new BinaryFrame();
}
public MessageOutputStream(WebSocketSession session)
@Override
public void write(byte[] bytes, int off, int len) throws IOException
{
this(session.getOutgoingHandler(),session.getPolicy().getMaxBinaryMessageBufferSize(),session.getBufferPool());
try
{
send(bytes, off, len);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
private void assertNotClosed() throws IOException
@Override
public void write(int b) throws IOException
{
try
{
send(new byte[]{(byte)b}, 0, 1);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override
public void flush() throws IOException
{
try
{
flush(false);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override
public void close() throws IOException
{
try
{
flush(true);
bufferPool.release(buffer);
LOG.debug("Stream closed, {} frames sent", frameCount);
// Notify without holding locks.
notifySuccess();
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
private void flush(boolean fin) throws IOException
{
synchronized (this)
{
if (closed)
{
IOException e = new IOException("Stream is closed");
notifyFailure(e);
throw e;
}
}
throw new IOException("Stream is closed");
@Override
public synchronized void close() throws IOException
{
assertNotClosed();
LOG.debug("close()");
closed = fin;
// finish sending whatever in the buffer with FIN=true
flush(true);
// close stream
LOG.debug("Sent Frame Count: {}",frameCount);
closed = true;
try
{
if (callback != null)
{
callback.writeSuccess();
}
super.close();
bufferPool.release(buffer);
LOG.debug("closed");
}
catch (IOException e)
{
notifyFailure(e);
throw e;
}
}
@Override
public synchronized void flush() throws IOException
{
LOG.debug("flush()");
assertNotClosed();
// flush whatever is in the buffer with FIN=false
flush(false);
try
{
super.flush();
LOG.debug("flushed");
}
catch (IOException e)
{
notifyFailure(e);
throw e;
}
}
/**
* Flush whatever is in the buffer.
*
* @param fin
* fin flag
* @throws IOException
*/
private synchronized void flush(boolean fin) throws IOException
{
BufferUtil.flipToFlush(buffer, 0);
LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
frame.setPayload(buffer);
frame.setFin(fin);
try
{
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
// block on write
blocker.block();
// block success
frameCount++;
++frameCount;
// Any flush after the first will be a CONTINUATION frame.
frame.setIsContinuation();
BufferUtil.flipToFill(buffer);
}
catch (IOException e)
{
notifyFailure(e);
throw e;
}
}
private void notifyFailure(IOException e)
private void send(byte[] bytes, int offset, int length) throws IOException
{
if (callback != null)
synchronized (this)
{
callback.writeFailed(e);
if (closed)
throw new IOException("Stream is closed");
while (length > 0)
{
// There may be no space available, we want
// to handle correctly when space == 0.
int space = buffer.remaining();
int size = Math.min(space, length);
buffer.put(bytes, offset, size);
offset += size;
length -= size;
if (length > 0)
{
// If we could not write everything, it means
// that the buffer was full, so flush it.
flush(false);
}
}
}
}
public void setCallback(WriteCallback callback)
{
synchronized (this)
{
this.callback = callback;
}
@Override
public synchronized void write(byte[] b) throws IOException
{
try
{
this.write(b,0,b.length);
}
catch (IOException e)
private void notifySuccess()
{
notifyFailure(e);
throw e;
WriteCallback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.writeSuccess();
}
}
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException
private void notifyFailure(Throwable failure)
{
LOG.debug("write(byte[{}], {}, {})",b.length,off,len);
int left = len; // bytes left to write
int offset = off; // offset within provided array
while (left > 0)
WriteCallback callback;
synchronized (this)
{
if (LOG.isDebugEnabled())
callback = this.callback;
}
if (callback != null)
{
LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer));
}
int space = buffer.remaining();
assert (space > 0);
int size = Math.min(space,left);
buffer.put(b,offset,size);
assert (size > 0);
left -= size; // decrement bytes left
if (left > 0)
{
flush(false);
}
offset += size; // increment offset
}
}
@Override
public synchronized void write(int b) throws IOException
{
assertNotClosed();
// buffer up to limit, flush once buffer reached.
buffer.put((byte)b);
if (buffer.remaining() <= 0)
{
flush(false);
callback.writeFailed(failure);
}
}
}

View File

@ -21,11 +21,12 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* Support class for reading a (single) WebSocket TEXT message via a Reader.
* <p>
* <p/>
* In compliance to the WebSocket spec, this reader always uses the UTF8 {@link Charset}.
*/
public class MessageReader extends InputStreamReader implements MessageAppender

View File

@ -35,21 +35,27 @@ import org.eclipse.jetty.websocket.common.frames.TextFrame;
/**
* Support for writing a single WebSocket TEXT message via a {@link Writer}
* <p>
* <p/>
* Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8
*/
public class MessageWriter extends Writer
{
private static final Logger LOG = Log.getLogger(MessageWriter.class);
private final OutgoingFrames outgoing;
private final ByteBufferPool bufferPool;
private final BlockingWriteCallback blocker;
private long frameCount = 0;
private long frameCount;
private TextFrame frame;
private ByteBuffer buffer;
private Utf8CharBuffer utf;
private WriteCallback callback;
private boolean closed = false;
private boolean closed;
public MessageWriter(WebSocketSession session)
{
this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool());
}
public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
{
@ -58,142 +64,156 @@ public class MessageWriter extends Writer
this.blocker = new BlockingWriteCallback();
this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(buffer);
this.utf = Utf8CharBuffer.wrap(buffer);
this.frame = new TextFrame();
}
public MessageWriter(WebSocketSession session)
{
this(session.getOutgoingHandler(),session.getPolicy().getMaxTextMessageBufferSize(),session.getBufferPool());
}
private void assertNotClosed() throws IOException
{
if (closed)
{
IOException e = new IOException("Stream is closed");
notifyFailure(e);
throw e;
}
this.utf = Utf8CharBuffer.wrap(buffer);
}
@Override
public synchronized void close() throws IOException
{
assertNotClosed();
// finish sending whatever in the buffer with FIN=true
flush(true);
// close stream
closed = true;
if (callback != null)
{
callback.writeSuccess();
}
bufferPool.release(buffer);
LOG.debug("closed (frame count={})",frameCount);
}
@Override
public void flush() throws IOException
{
assertNotClosed();
// flush whatever is in the buffer with FIN=false
flush(false);
}
/**
* Flush whatever is in the buffer.
*
* @param fin
* fin flag
* @throws IOException
*/
private synchronized void flush(boolean fin) throws IOException
{
ByteBuffer data = utf.getByteBuffer();
frame.setPayload(data);
frame.setFin(fin);
try
{
outgoing.outgoingFrame(frame,blocker, BatchMode.OFF);
// block on write
blocker.block();
// write success
// clear utf buffer
utf.clear();
frameCount++;
frame.setIsContinuation();
}
catch (IOException e)
{
notifyFailure(e);
throw e;
}
}
private void notifyFailure(IOException e)
{
if (callback != null)
{
callback.writeFailed(e);
}
}
public void setCallback(WriteCallback callback)
{
this.callback = callback;
}
@Override
public void write(char[] cbuf) throws IOException
public void write(char[] chars, int off, int len) throws IOException
{
try
{
this.write(cbuf,0,cbuf.length);
send(chars, off, len);
}
catch (IOException e)
catch (Throwable x)
{
notifyFailure(e);
throw e;
}
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException
{
assertNotClosed();
int left = len; // bytes left to write
int offset = off; // offset within provided array
while (left > 0)
{
int space = utf.remaining();
int size = Math.min(space,left);
assert (space > 0);
assert (size > 0);
utf.append(cbuf,offset,size); // append with utf logic
left -= size; // decrement char left
if (left > 0)
{
flush(false);
}
offset += size; // increment offset
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override
public void write(int c) throws IOException
{
assertNotClosed();
// buffer up to limit, flush once buffer reached.
utf.append(c); // append with utf logic
if (utf.remaining() <= 0)
try
{
send(new char[]{(char)c}, 0, 1);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override
public void flush() throws IOException
{
try
{
flush(false);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override
public void close() throws IOException
{
try
{
flush(true);
bufferPool.release(buffer);
LOG.debug("Stream closed, {} frames sent", frameCount);
// Notify without holding locks.
notifySuccess();
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
private void flush(boolean fin) throws IOException
{
synchronized (this)
{
if (closed)
throw new IOException("Stream is closed");
closed = fin;
ByteBuffer data = utf.getByteBuffer();
LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
frame.setPayload(data);
frame.setFin(fin);
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
blocker.block();
++frameCount;
// Any flush after the first will be a CONTINUATION frame.
frame.setIsContinuation();
utf.clear();
}
}
private void send(char[] chars, int offset, int length) throws IOException
{
synchronized (this)
{
if (closed)
throw new IOException("Stream is closed");
while (length > 0)
{
// There may be no space available, we want
// to handle correctly when space == 0.
int space = utf.remaining();
int size = Math.min(space, length);
utf.append(chars, offset, size);
offset += size;
length -= size;
if (length > 0)
{
// If we could not write everything, it means
// that the buffer was full, so flush it.
flush(false);
}
}
}
}
public void setCallback(WriteCallback callback)
{
synchronized (this)
{
this.callback = callback;
}
}
private void notifySuccess()
{
WriteCallback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.writeSuccess();
}
}
private void notifyFailure(Throwable failure)
{
WriteCallback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.writeFailed(failure);
}
}
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -36,6 +34,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.hamcrest.Matchers.is;
public class MessageInputStreamTest
{
@Rule
@ -49,7 +49,7 @@ public class MessageInputStreamTest
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool);
try (MessageInputStream stream = new MessageInputStream(conn))
try (MessageInputStream stream = new MessageInputStream())
{
// Append a single message (simple, short)
ByteBuffer payload = BufferUtil.toBuffer("Hello World",StandardCharsets.UTF_8);
@ -72,7 +72,7 @@ public class MessageInputStreamTest
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool);
try (MessageInputStream stream = new MessageInputStream(conn))
try (MessageInputStream stream = new MessageInputStream())
{
final AtomicBoolean hadError = new AtomicBoolean(false);
final CountDownLatch startLatch = new CountDownLatch(1);
@ -123,7 +123,7 @@ public class MessageInputStreamTest
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool);
try (MessageInputStream stream = new MessageInputStream(conn))
try (MessageInputStream stream = new MessageInputStream())
{
final AtomicBoolean hadError = new AtomicBoolean(false);
@ -162,7 +162,7 @@ public class MessageInputStreamTest
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool);
try (MessageInputStream stream = new MessageInputStream(conn))
try (MessageInputStream stream = new MessageInputStream())
{
final AtomicBoolean hadError = new AtomicBoolean(false);