diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java index 7a1d66d7a79..a550c20b8b4 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.io.Reader; import java.nio.ByteBuffer; import java.util.Map; - import javax.websocket.CloseReason; import javax.websocket.DecodeException; @@ -103,7 +102,7 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E if (activeMessage == null) { LOG.debug("Binary Message InputStream"); - final MessageInputStream stream = new MessageInputStream(session.getConnection()); + final MessageInputStream stream = new MessageInputStream(); activeMessage = stream; // Always dispatch streaming read to another thread. @@ -311,7 +310,7 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E { LOG.debug("Text Message Writer"); - final MessageReader stream = new MessageReader(new MessageInputStream(session.getConnection())); + final MessageReader stream = new MessageReader(new MessageInputStream()); activeMessage = stream; // Always dispatch streaming read to another thread. diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java index 29536c81a50..b977147fb91 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.io.Reader; import java.nio.ByteBuffer; import java.util.Map; - import javax.websocket.CloseReason; import javax.websocket.Endpoint; import javax.websocket.MessageHandler; @@ -86,7 +85,7 @@ public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements Ev } else if (wrapper.wantsStreams()) { - final MessageInputStream stream = new MessageInputStream(session.getConnection()); + final MessageInputStream stream = new MessageInputStream(); activeMessage = stream; dispatch(new Runnable() { @@ -181,7 +180,7 @@ public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements Ev } else if (wrapper.wantsStreams()) { - final MessageReader stream = new MessageReader(new MessageInputStream(session.getConnection())); + final MessageReader stream = new MessageReader(new MessageInputStream()); activeMessage = stream; dispatch(new Runnable() diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java new file mode 100644 index 00000000000..1dc6b02863f --- /dev/null +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java @@ -0,0 +1,177 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.jsr356.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.websocket.ClientEndpoint; +import javax.websocket.ContainerProvider; +import javax.websocket.OnMessage; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import javax.websocket.server.ServerEndpoint; +import javax.websocket.server.ServerEndpointConfig; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BinaryStreamTest +{ + private static final String PATH = "/echo"; + + private Server server; + private ServerConnector connector; + private WebSocketContainer wsClient; + + @Before + public void prepare() throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(server, "/", true, false); + ServerContainer container = WebSocketServerContainerInitializer.configureContext(context); + ServerEndpointConfig config = ServerEndpointConfig.Builder.create(ServerBinaryStreamer.class, PATH).build(); + container.addEndpoint(config); + + server.start(); + + wsClient = ContainerProvider.getWebSocketContainer(); + server.addBean(wsClient, true); + } + + @After + public void dispose() throws Exception + { + server.stop(); + } + + @Test + public void testEchoWithMediumMessage() throws Exception + { + testEcho(1024); + } + + @Test + public void testLargestMessage() throws Exception + { + testEcho(wsClient.getDefaultMaxBinaryMessageBufferSize()); + } + + private void testEcho(int size) throws Exception + { + byte[] data = randomBytes(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + ClientBinaryStreamer client = new ClientBinaryStreamer(); + Session session = wsClient.connectToServer(client, uri); + + try (OutputStream output = session.getBasicRemote().getSendStream()) + { + output.write(data); + } + + Assert.assertTrue(client.await(5, TimeUnit.SECONDS)); + Assert.assertArrayEquals(data, client.getEcho()); + } + + @Test + public void testMoreThanLargestMessageOneByteAtATime() throws Exception + { + int size = wsClient.getDefaultMaxBinaryMessageBufferSize() + 16; + byte[] data = randomBytes(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + ClientBinaryStreamer client = new ClientBinaryStreamer(); + Session session = wsClient.connectToServer(client, uri); + + try (OutputStream output = session.getBasicRemote().getSendStream()) + { + for (int i = 0; i < size; ++i) + output.write(data[i]); + } + + Assert.assertTrue(client.await(5, TimeUnit.SECONDS)); + Assert.assertArrayEquals(data, client.getEcho()); + } + + private byte[] randomBytes(int size) + { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return data; + } + + @ClientEndpoint + public static class ClientBinaryStreamer + { + private final CountDownLatch latch = new CountDownLatch(1); + private final ByteArrayOutputStream output = new ByteArrayOutputStream(); + + @OnMessage + public void echoed(InputStream input) throws IOException + { + while (true) + { + int read = input.read(); + if (read < 0) + break; + output.write(read); + } + latch.countDown(); + } + + public byte[] getEcho() + { + return output.toByteArray(); + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + return latch.await(timeout, unit); + } + } + + @ServerEndpoint(PATH) + public static class ServerBinaryStreamer + { + @OnMessage + public void echo(Session session, InputStream input) throws IOException + { + byte[] buffer = new byte[128]; + try (OutputStream output = session.getBasicRemote().getSendStream()) + { + int read; + while ((read = input.read(buffer)) >= 0) + output.write(buffer, 0, read); + } + } + } +} diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java new file mode 100644 index 00000000000..dd66d135bf3 --- /dev/null +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java @@ -0,0 +1,179 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.jsr356.server; + +import java.io.IOException; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.websocket.ClientEndpoint; +import javax.websocket.ContainerProvider; +import javax.websocket.OnMessage; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import javax.websocket.server.ServerEndpoint; +import javax.websocket.server.ServerEndpointConfig; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TextStreamTest +{ + private static final String PATH = "/echo"; + private static final String CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + + private Server server; + private ServerConnector connector; + private WebSocketContainer wsClient; + + @Before + public void prepare() throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(server, "/", true, false); + ServerContainer container = WebSocketServerContainerInitializer.configureContext(context); + ServerEndpointConfig config = ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build(); + container.addEndpoint(config); + + server.start(); + + wsClient = ContainerProvider.getWebSocketContainer(); + server.addBean(wsClient, true); + } + + @After + public void dispose() throws Exception + { + server.stop(); + } + + @Test + public void testEchoWithMediumMessage() throws Exception + { + testEcho(1024); + } + + @Test + public void testLargestMessage() throws Exception + { + testEcho(wsClient.getDefaultMaxBinaryMessageBufferSize()); + } + + private void testEcho(int size) throws Exception + { + char[] data = randomChars(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + ClientTextStreamer client = new ClientTextStreamer(); + Session session = wsClient.connectToServer(client, uri); + + try (Writer output = session.getBasicRemote().getSendWriter()) + { + output.write(data); + } + + Assert.assertTrue(client.await(5, TimeUnit.SECONDS)); + Assert.assertArrayEquals(data, client.getEcho()); + } + + @Test + public void testMoreThanLargestMessageOneByteAtATime() throws Exception + { + int size = wsClient.getDefaultMaxBinaryMessageBufferSize() + 16; + char[] data = randomChars(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + ClientTextStreamer client = new ClientTextStreamer(); + Session session = wsClient.connectToServer(client, uri); + + try (Writer output = session.getBasicRemote().getSendWriter()) + { + for (int i = 0; i < size; ++i) + output.write(data[i]); + } + + Assert.assertTrue(client.await(5, TimeUnit.SECONDS)); + Assert.assertArrayEquals(data, client.getEcho()); + } + + private char[] randomChars(int size) + { + char[] data = new char[size]; + Random random = new Random(); + for (int i = 0; i < data.length; ++i) + data[i] = CHARS.charAt(random.nextInt(CHARS.length())); + return data; + } + + @ClientEndpoint + public static class ClientTextStreamer + { + private final CountDownLatch latch = new CountDownLatch(1); + private final StringBuilder output = new StringBuilder(); + + @OnMessage + public void echoed(Reader input) throws IOException + { + while (true) + { + int read = input.read(); + if (read < 0) + break; + output.append((char)read); + } + latch.countDown(); + } + + public char[] getEcho() + { + return output.toString().toCharArray(); + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + return latch.await(timeout, unit); + } + } + + @ServerEndpoint(PATH) + public static class ServerTextStreamer + { + @OnMessage + public void echo(Session session, Reader input) throws IOException + { + char[] buffer = new char[128]; + try (Writer output = session.getBasicRemote().getSendWriter()) + { + int read; + while ((read = input.read(buffer)) >= 0) + output.write(buffer, 0, read); + } + } + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java index 8d075b49945..1536b72dbb8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java @@ -79,7 +79,7 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver { if (events.onBinary.isStreaming()) { - activeMessage = new MessageInputStream(session.getConnection()); + activeMessage = new MessageInputStream(); final MessageAppender msg = activeMessage; dispatch(new Runnable() { @@ -181,7 +181,7 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver { if (events.onText.isStreaming()) { - activeMessage = new MessageReader(new MessageInputStream(session.getConnection())); + activeMessage = new MessageReader(new MessageInputStream()); final MessageAppender msg = activeMessage; dispatch(new Runnable() { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index a053dff0572..0a81cbdf8a5 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -29,30 +29,28 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.common.LogicalConnection; /** * Support class for reading a (single) WebSocket BINARY message via a InputStream. - *

+ *

* An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior. */ public class MessageInputStream extends InputStream implements MessageAppender { private static final Logger LOG = Log.getLogger(MessageInputStream.class); - // EOF (End of Buffers) - private final static ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer(); + private static final ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer(); private final BlockingDeque buffers = new LinkedBlockingDeque<>(); private AtomicBoolean closed = new AtomicBoolean(false); private final long timeoutMs; private ByteBuffer activeBuffer = null; - public MessageInputStream(LogicalConnection connection) + public MessageInputStream() { - this(connection, -1); + this(-1); } - - public MessageInputStream(LogicalConnection connection, int timeoutMs) + + public MessageInputStream(int timeoutMs) { this.timeoutMs = timeoutMs; } @@ -61,9 +59,7 @@ public class MessageInputStream extends InputStream implements MessageAppender public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException { if (LOG.isDebugEnabled()) - { - LOG.debug("appendMessage(ByteBuffer,{}): {}",fin,BufferUtil.toDetailString(framePayload)); - } + LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload)); // If closed, we should just toss incoming payloads into the bit bucket. if (closed.get()) @@ -98,7 +94,7 @@ public class MessageInputStream extends InputStream implements MessageAppender @Override public void close() throws IOException { - if (closed.compareAndSet(false,true)) + if (closed.compareAndSet(false, true)) { buffers.offer(EOF); super.close(); @@ -106,9 +102,9 @@ public class MessageInputStream extends InputStream implements MessageAppender } @Override - public synchronized void mark(int readlimit) + public void mark(int readlimit) { - /* do nothing */ + // Not supported. } @Override @@ -120,62 +116,64 @@ public class MessageInputStream extends InputStream implements MessageAppender @Override public void messageComplete() { - LOG.debug("messageComplete()"); - - // toss an empty ByteBuffer into queue to let it drain + LOG.debug("Message completed"); buffers.offer(EOF); } @Override public int read() throws IOException { - LOG.debug("read()"); - try { if (closed.get()) { + LOG.debug("Stream closed"); return -1; } // grab a fresh buffer while (activeBuffer == null || !activeBuffer.hasRemaining()) { + if (LOG.isDebugEnabled()) + LOG.debug("Waiting {} ms to read", timeoutMs); if (timeoutMs < 0) { - // infinite take + // Wait forever until a buffer is available. activeBuffer = buffers.take(); } else { - // timeout specific - activeBuffer = buffers.poll(timeoutMs,TimeUnit.MILLISECONDS); + // Wait at most for the given timeout. + activeBuffer = buffers.poll(timeoutMs, TimeUnit.MILLISECONDS); if (activeBuffer == null) { - throw new IOException(String.format("Read timeout: %,dms expired",timeoutMs)); + throw new IOException(String.format("Read timeout: %,dms expired", timeoutMs)); } } - + if (activeBuffer == EOF) { + LOG.debug("Reached EOF"); + // Be sure that this stream cannot be reused. closed.set(true); + // Removed buffers that may have remained in the queue. + buffers.clear(); return -1; } } - return activeBuffer.get(); + return activeBuffer.get() & 0xFF; } - catch (InterruptedException e) + catch (InterruptedException x) { - LOG.warn(e); + LOG.debug("Interrupted while waiting to read", x); closed.set(true); return -1; - // throw new IOException(e); } } @Override - public synchronized void reset() throws IOException + public void reset() throws IOException { throw new IOException("reset() not supported"); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index a34301fb34b..99364114602 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -39,183 +39,177 @@ import org.eclipse.jetty.websocket.common.frames.BinaryFrame; public class MessageOutputStream extends OutputStream { private static final Logger LOG = Log.getLogger(MessageOutputStream.class); + private final OutgoingFrames outgoing; private final ByteBufferPool bufferPool; private final BlockingWriteCallback blocker; - private long frameCount = 0; + private long frameCount; private BinaryFrame frame; private ByteBuffer buffer; private WriteCallback callback; - private boolean closed = false; + private boolean closed; + + public MessageOutputStream(WebSocketSession session) + { + this(session.getOutgoingHandler(), session.getPolicy().getMaxBinaryMessageBufferSize(), session.getBufferPool()); + } public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool) { this.outgoing = outgoing; this.bufferPool = bufferPool; this.blocker = new BlockingWriteCallback(); - this.buffer = bufferPool.acquire(bufferSize,true); + this.buffer = bufferPool.acquire(bufferSize, true); BufferUtil.flipToFill(buffer); this.frame = new BinaryFrame(); } - public MessageOutputStream(WebSocketSession session) + @Override + public void write(byte[] bytes, int off, int len) throws IOException { - this(session.getOutgoingHandler(),session.getPolicy().getMaxBinaryMessageBufferSize(),session.getBufferPool()); - } - - private void assertNotClosed() throws IOException - { - if (closed) + try { - IOException e = new IOException("Stream is closed"); - notifyFailure(e); - throw e; + send(bytes, off, len); + } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; } } @Override - public synchronized void close() throws IOException + public void write(int b) throws IOException { - assertNotClosed(); - LOG.debug("close()"); - - // finish sending whatever in the buffer with FIN=true - flush(true); - - // close stream - LOG.debug("Sent Frame Count: {}",frameCount); - closed = true; try { - if (callback != null) - { - callback.writeSuccess(); - } - super.close(); + send(new byte[]{(byte)b}, 0, 1); + } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; + } + } + + @Override + public void flush() throws IOException + { + try + { + flush(false); + } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; + } + } + + @Override + public void close() throws IOException + { + try + { + flush(true); bufferPool.release(buffer); - LOG.debug("closed"); + LOG.debug("Stream closed, {} frames sent", frameCount); + // Notify without holding locks. + notifySuccess(); } - catch (IOException e) + catch (Throwable x) { - notifyFailure(e); - throw e; + // Notify without holding locks. + notifyFailure(x); + throw x; } } - @Override - public synchronized void flush() throws IOException + private void flush(boolean fin) throws IOException { - LOG.debug("flush()"); - assertNotClosed(); - - // flush whatever is in the buffer with FIN=false - flush(false); - try + synchronized (this) { - super.flush(); - LOG.debug("flushed"); - } - catch (IOException e) - { - notifyFailure(e); - throw e; - } - } + if (closed) + throw new IOException("Stream is closed"); - /** - * Flush whatever is in the buffer. - * - * @param fin - * fin flag - * @throws IOException - */ - private synchronized void flush(boolean fin) throws IOException - { - BufferUtil.flipToFlush(buffer,0); - LOG.debug("flush({}): {}",fin,BufferUtil.toDetailString(buffer)); - frame.setPayload(buffer); - frame.setFin(fin); + closed = fin; - try - { - outgoing.outgoingFrame(frame,blocker, BatchMode.OFF); - // block on write + BufferUtil.flipToFlush(buffer, 0); + LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer)); + frame.setPayload(buffer); + frame.setFin(fin); + + outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); blocker.block(); - // block success - frameCount++; + + ++frameCount; + // Any flush after the first will be a CONTINUATION frame. frame.setIsContinuation(); + BufferUtil.flipToFill(buffer); } - catch (IOException e) - { - notifyFailure(e); - throw e; - } } - private void notifyFailure(IOException e) + private void send(byte[] bytes, int offset, int length) throws IOException { - if (callback != null) + synchronized (this) { - callback.writeFailed(e); + if (closed) + throw new IOException("Stream is closed"); + + while (length > 0) + { + // There may be no space available, we want + // to handle correctly when space == 0. + int space = buffer.remaining(); + int size = Math.min(space, length); + buffer.put(bytes, offset, size); + offset += size; + length -= size; + if (length > 0) + { + // If we could not write everything, it means + // that the buffer was full, so flush it. + flush(false); + } + } } } public void setCallback(WriteCallback callback) { - this.callback = callback; - } - - @Override - public synchronized void write(byte[] b) throws IOException - { - try + synchronized (this) { - this.write(b,0,b.length); - } - catch (IOException e) - { - notifyFailure(e); - throw e; + this.callback = callback; } } - @Override - public synchronized void write(byte[] b, int off, int len) throws IOException + private void notifySuccess() { - LOG.debug("write(byte[{}], {}, {})",b.length,off,len); - int left = len; // bytes left to write - int offset = off; // offset within provided array - while (left > 0) + WriteCallback callback; + synchronized (this) { - if (LOG.isDebugEnabled()) - { - LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer)); - } - int space = buffer.remaining(); - assert (space > 0); - int size = Math.min(space,left); - buffer.put(b,offset,size); - assert (size > 0); - left -= size; // decrement bytes left - if (left > 0) - { - flush(false); - } - offset += size; // increment offset + callback = this.callback; + } + if (callback != null) + { + callback.writeSuccess(); } } - @Override - public synchronized void write(int b) throws IOException + private void notifyFailure(Throwable failure) { - assertNotClosed(); - - // buffer up to limit, flush once buffer reached. - buffer.put((byte)b); - if (buffer.remaining() <= 0) + WriteCallback callback; + synchronized (this) { - flush(false); + callback = this.callback; + } + if (callback != null) + { + callback.writeFailed(failure); } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java index ef76ecbcbc6..477046b2089 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java @@ -21,11 +21,12 @@ package org.eclipse.jetty.websocket.common.message; import java.io.IOException; import java.io.InputStreamReader; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; /** * Support class for reading a (single) WebSocket TEXT message via a Reader. - *

+ *

* In compliance to the WebSocket spec, this reader always uses the UTF8 {@link Charset}. */ public class MessageReader extends InputStreamReader implements MessageAppender @@ -34,14 +35,14 @@ public class MessageReader extends InputStreamReader implements MessageAppender public MessageReader(MessageInputStream stream) { - super(stream,StandardCharsets.UTF_8); + super(stream, StandardCharsets.UTF_8); this.stream = stream; } @Override public void appendFrame(ByteBuffer payload, boolean isLast) throws IOException { - this.stream.appendFrame(payload,isLast); + this.stream.appendFrame(payload, isLast); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 8fbbbc39347..aa3852ede86 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -35,165 +35,185 @@ import org.eclipse.jetty.websocket.common.frames.TextFrame; /** * Support for writing a single WebSocket TEXT message via a {@link Writer} - *

+ *

* Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8 */ public class MessageWriter extends Writer { private static final Logger LOG = Log.getLogger(MessageWriter.class); + private final OutgoingFrames outgoing; private final ByteBufferPool bufferPool; private final BlockingWriteCallback blocker; - private long frameCount = 0; + private long frameCount; private TextFrame frame; private ByteBuffer buffer; private Utf8CharBuffer utf; private WriteCallback callback; - private boolean closed = false; + private boolean closed; + + public MessageWriter(WebSocketSession session) + { + this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool()); + } public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool) { this.outgoing = outgoing; this.bufferPool = bufferPool; this.blocker = new BlockingWriteCallback(); - this.buffer = bufferPool.acquire(bufferSize,true); + this.buffer = bufferPool.acquire(bufferSize, true); BufferUtil.flipToFill(buffer); - this.utf = Utf8CharBuffer.wrap(buffer); this.frame = new TextFrame(); - } - - public MessageWriter(WebSocketSession session) - { - this(session.getOutgoingHandler(),session.getPolicy().getMaxTextMessageBufferSize(),session.getBufferPool()); - } - - private void assertNotClosed() throws IOException - { - if (closed) - { - IOException e = new IOException("Stream is closed"); - notifyFailure(e); - throw e; - } + this.utf = Utf8CharBuffer.wrap(buffer); } @Override - public synchronized void close() throws IOException - { - assertNotClosed(); - - // finish sending whatever in the buffer with FIN=true - flush(true); - - // close stream - closed = true; - if (callback != null) - { - callback.writeSuccess(); - } - bufferPool.release(buffer); - LOG.debug("closed (frame count={})",frameCount); - } - - @Override - public void flush() throws IOException - { - assertNotClosed(); - - // flush whatever is in the buffer with FIN=false - flush(false); - } - - /** - * Flush whatever is in the buffer. - * - * @param fin - * fin flag - * @throws IOException - */ - private synchronized void flush(boolean fin) throws IOException - { - ByteBuffer data = utf.getByteBuffer(); - frame.setPayload(data); - frame.setFin(fin); - - try - { - outgoing.outgoingFrame(frame,blocker, BatchMode.OFF); - // block on write - blocker.block(); - // write success - // clear utf buffer - utf.clear(); - frameCount++; - frame.setIsContinuation(); - } - catch (IOException e) - { - notifyFailure(e); - throw e; - } - } - - private void notifyFailure(IOException e) - { - if (callback != null) - { - callback.writeFailed(e); - } - } - - public void setCallback(WriteCallback callback) - { - this.callback = callback; - } - - @Override - public void write(char[] cbuf) throws IOException + public void write(char[] chars, int off, int len) throws IOException { try { - this.write(cbuf,0,cbuf.length); + send(chars, off, len); } - catch (IOException e) + catch (Throwable x) { - notifyFailure(e); - throw e; - } - } - - @Override - public void write(char[] cbuf, int off, int len) throws IOException - { - assertNotClosed(); - int left = len; // bytes left to write - int offset = off; // offset within provided array - while (left > 0) - { - int space = utf.remaining(); - int size = Math.min(space,left); - assert (space > 0); - assert (size > 0); - utf.append(cbuf,offset,size); // append with utf logic - left -= size; // decrement char left - if (left > 0) - { - flush(false); - } - offset += size; // increment offset + // Notify without holding locks. + notifyFailure(x); + throw x; } } @Override public void write(int c) throws IOException { - assertNotClosed(); + try + { + send(new char[]{(char)c}, 0, 1); + } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; + } + } - // buffer up to limit, flush once buffer reached. - utf.append(c); // append with utf logic - if (utf.remaining() <= 0) + @Override + public void flush() throws IOException + { + try { flush(false); } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; + } + } + + @Override + public void close() throws IOException + { + try + { + flush(true); + bufferPool.release(buffer); + LOG.debug("Stream closed, {} frames sent", frameCount); + // Notify without holding locks. + notifySuccess(); + } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; + } + } + + private void flush(boolean fin) throws IOException + { + synchronized (this) + { + if (closed) + throw new IOException("Stream is closed"); + + closed = fin; + + ByteBuffer data = utf.getByteBuffer(); + LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer)); + frame.setPayload(data); + frame.setFin(fin); + + outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); + blocker.block(); + + ++frameCount; + // Any flush after the first will be a CONTINUATION frame. + frame.setIsContinuation(); + + utf.clear(); + } + } + + private void send(char[] chars, int offset, int length) throws IOException + { + synchronized (this) + { + if (closed) + throw new IOException("Stream is closed"); + + while (length > 0) + { + // There may be no space available, we want + // to handle correctly when space == 0. + int space = utf.remaining(); + int size = Math.min(space, length); + utf.append(chars, offset, size); + offset += size; + length -= size; + if (length > 0) + { + // If we could not write everything, it means + // that the buffer was full, so flush it. + flush(false); + } + } + } + } + + public void setCallback(WriteCallback callback) + { + synchronized (this) + { + this.callback = callback; + } + } + + private void notifySuccess() + { + WriteCallback callback; + synchronized (this) + { + callback = this.callback; + } + if (callback != null) + { + callback.writeSuccess(); + } + } + + private void notifyFailure(Throwable failure) + { + WriteCallback callback; + synchronized (this) + { + callback = this.callback; + } + if (callback != null) + { + callback.writeFailed(failure); + } } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java index e1dfc5f9d87..b71149e6a3f 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.message; -import static org.hamcrest.Matchers.*; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -36,6 +34,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import static org.hamcrest.Matchers.is; + public class MessageInputStreamTest { @Rule @@ -49,7 +49,7 @@ public class MessageInputStreamTest { LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream(conn)) + try (MessageInputStream stream = new MessageInputStream()) { // Append a single message (simple, short) ByteBuffer payload = BufferUtil.toBuffer("Hello World",StandardCharsets.UTF_8); @@ -72,7 +72,7 @@ public class MessageInputStreamTest { LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream(conn)) + try (MessageInputStream stream = new MessageInputStream()) { final AtomicBoolean hadError = new AtomicBoolean(false); final CountDownLatch startLatch = new CountDownLatch(1); @@ -123,7 +123,7 @@ public class MessageInputStreamTest { LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream(conn)) + try (MessageInputStream stream = new MessageInputStream()) { final AtomicBoolean hadError = new AtomicBoolean(false); @@ -162,7 +162,7 @@ public class MessageInputStreamTest { LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream(conn)) + try (MessageInputStream stream = new MessageInputStream()) { final AtomicBoolean hadError = new AtomicBoolean(false);