diff --git a/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java b/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java index 691eea4649a..11fb55188c7 100644 --- a/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java +++ b/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java @@ -121,7 +121,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes if (OpCode.isDataFrame(frame.getOpCode())) { - messageSink.accept(frame, callback); + messageSink.accept(Frame.copy(frame), callback); if (frame.isFin()) { messageSink = null; diff --git a/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageOutputStream.java b/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageOutputStream.java index 92bac19a7b1..8d60377d580 100644 --- a/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageOutputStream.java +++ b/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageOutputStream.java @@ -55,7 +55,6 @@ public class MessageOutputStream extends OutputStream this.bufferPool = bufferPool; this.bufferSize = coreSession.getOutputBufferSize(); this.buffer = bufferPool.acquire(bufferSize, true); - BufferUtil.clear(buffer); } void setMessageType(byte opcode) @@ -93,6 +92,20 @@ public class MessageOutputStream extends OutputStream } } + public void write(ByteBuffer buffer) throws IOException + { + try + { + send(buffer); + } + catch (Throwable x) + { + // Notify without holding locks. + notifyFailure(x); + throw x; + } + } + @Override public void flush() throws IOException { diff --git a/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageWriter.java b/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageWriter.java index ddc2b85fbef..7a5176a535b 100644 --- a/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageWriter.java +++ b/jetty-websocket/websocket-util/src/main/java/org/eclipse/jetty/websocket/util/messages/MessageWriter.java @@ -20,19 +20,12 @@ package org.eclipse.jetty.websocket.util.messages; import java.io.IOException; import java.io.Writer; -import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.FutureCallback; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.CoreSession; -import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.OpCode; import static java.nio.charset.StandardCharsets.UTF_8; @@ -44,180 +37,33 @@ import static java.nio.charset.StandardCharsets.UTF_8; */ public class MessageWriter extends Writer { - private static final Logger LOG = Log.getLogger(MessageWriter.class); - + private final MessageOutputStream outputStream; private final CharsetEncoder utf8Encoder = UTF_8.newEncoder() .onUnmappableCharacter(CodingErrorAction.REPORT) .onMalformedInput(CodingErrorAction.REPORT); - private final CoreSession coreSession; - private long frameCount; - private Frame frame; - private CharBuffer buffer; - private Callback callback; - private boolean closed; - public MessageWriter(CoreSession coreSession, ByteBufferPool bufferPool) { - this.coreSession = coreSession; - this.buffer = CharBuffer.allocate(coreSession.getOutputBufferSize()); - this.frame = new Frame(OpCode.TEXT); + this.outputStream = new MessageOutputStream(coreSession, bufferPool); + this.outputStream.setMessageType(OpCode.TEXT); } @Override - public void write(char[] chars, int off, int len) throws IOException + public void write(char[] cbuf, int off, int len) throws IOException { - try - { - send(chars, off, len); - } - catch (Throwable x) - { - // Notify without holding locks. - notifyFailure(x); - throw x; - } - } - - @Override - public void write(int c) throws IOException - { - try - { - send(new char[]{(char)c}, 0, 1); - } - catch (Throwable x) - { - // Notify without holding locks. - notifyFailure(x); - throw x; - } + CharBuffer charBuffer = CharBuffer.wrap(cbuf, off, len); + outputStream.write(utf8Encoder.encode(charBuffer)); } @Override public void flush() throws IOException { - try - { - flush(false); - } - catch (Throwable x) - { - // Notify without holding locks. - notifyFailure(x); - throw x; - } - } - - private void flush(boolean fin) throws IOException - { - synchronized (this) - { - if (closed) - throw new IOException("Stream is closed"); - - closed = fin; - - buffer.flip(); - ByteBuffer payload = utf8Encoder.encode(buffer); - buffer.flip(); - - if (LOG.isDebugEnabled()) - LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(payload)); - frame.setPayload(payload); - frame.setFin(fin); - - FutureCallback b = new FutureCallback(); - coreSession.sendFrame(frame, b, false); - b.block(); - - ++frameCount; - // Any flush after the first will be a CONTINUATION frame. - frame = new Frame(OpCode.CONTINUATION); - } - } - - private void send(char[] chars, int offset, int length) throws IOException - { - synchronized (this) - { - if (closed) - throw new IOException("Stream is closed"); - - CharBuffer source = CharBuffer.wrap(chars, offset, length); - - int remaining = length; - - while (remaining > 0) - { - int read = source.read(buffer); - if (read == -1) - { - return; - } - - remaining -= read; - - if (remaining > 0) - { - // If we could not write everything, it means - // that the buffer was full, so flush it. - flush(false); - } - } - } + outputStream.flush(); } @Override public void close() throws IOException { - try - { - flush(true); - if (LOG.isDebugEnabled()) - LOG.debug("Stream closed, {} frames sent", frameCount); - // Notify without holding locks. - notifySuccess(); - } - catch (Throwable x) - { - // Notify without holding locks. - notifyFailure(x); - throw x; - } - } - - public void setCallback(Callback callback) - { - synchronized (this) - { - this.callback = callback; - } - } - - private void notifySuccess() - { - Callback callback; - synchronized (this) - { - callback = this.callback; - } - if (callback != null) - { - callback.succeeded(); - } - } - - private void notifyFailure(Throwable failure) - { - Callback callback; - synchronized (this) - { - callback = this.callback; - } - if (callback != null) - { - callback.failed(failure); - } + outputStream.close(); } } \ No newline at end of file