Issue #4538 - MessageWriter delegates to MessageOutputStream

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-02-15 00:17:41 +11:00
parent e2f86f9a19
commit 6eccc7ebce
3 changed files with 23 additions and 164 deletions

View File

@ -121,7 +121,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
if (OpCode.isDataFrame(frame.getOpCode())) if (OpCode.isDataFrame(frame.getOpCode()))
{ {
messageSink.accept(frame, callback); messageSink.accept(Frame.copy(frame), callback);
if (frame.isFin()) if (frame.isFin())
{ {
messageSink = null; messageSink = null;

View File

@ -55,7 +55,6 @@ public class MessageOutputStream extends OutputStream
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.bufferSize = coreSession.getOutputBufferSize(); this.bufferSize = coreSession.getOutputBufferSize();
this.buffer = bufferPool.acquire(bufferSize, true); this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.clear(buffer);
} }
void setMessageType(byte opcode) void setMessageType(byte opcode)
@ -93,6 +92,20 @@ public class MessageOutputStream extends OutputStream
} }
} }
public void write(ByteBuffer buffer) throws IOException
{
try
{
send(buffer);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override @Override
public void flush() throws IOException public void flush() throws IOException
{ {

View File

@ -20,19 +20,12 @@ package org.eclipse.jetty.websocket.util.messages;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer; import java.nio.CharBuffer;
import java.nio.charset.CharsetEncoder; import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OpCode;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
@ -44,180 +37,33 @@ import static java.nio.charset.StandardCharsets.UTF_8;
*/ */
public class MessageWriter extends Writer public class MessageWriter extends Writer
{ {
private static final Logger LOG = Log.getLogger(MessageWriter.class); private final MessageOutputStream outputStream;
private final CharsetEncoder utf8Encoder = UTF_8.newEncoder() private final CharsetEncoder utf8Encoder = UTF_8.newEncoder()
.onUnmappableCharacter(CodingErrorAction.REPORT) .onUnmappableCharacter(CodingErrorAction.REPORT)
.onMalformedInput(CodingErrorAction.REPORT); .onMalformedInput(CodingErrorAction.REPORT);
private final CoreSession coreSession;
private long frameCount;
private Frame frame;
private CharBuffer buffer;
private Callback callback;
private boolean closed;
public MessageWriter(CoreSession coreSession, ByteBufferPool bufferPool) public MessageWriter(CoreSession coreSession, ByteBufferPool bufferPool)
{ {
this.coreSession = coreSession; this.outputStream = new MessageOutputStream(coreSession, bufferPool);
this.buffer = CharBuffer.allocate(coreSession.getOutputBufferSize()); this.outputStream.setMessageType(OpCode.TEXT);
this.frame = new Frame(OpCode.TEXT);
} }
@Override @Override
public void write(char[] chars, int off, int len) throws IOException public void write(char[] cbuf, int off, int len) throws IOException
{ {
try CharBuffer charBuffer = CharBuffer.wrap(cbuf, off, len);
{ outputStream.write(utf8Encoder.encode(charBuffer));
send(chars, off, len);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
@Override
public void write(int c) throws IOException
{
try
{
send(new char[]{(char)c}, 0, 1);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
} }
@Override @Override
public void flush() throws IOException public void flush() throws IOException
{ {
try outputStream.flush();
{
flush(false);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
private void flush(boolean fin) throws IOException
{
synchronized (this)
{
if (closed)
throw new IOException("Stream is closed");
closed = fin;
buffer.flip();
ByteBuffer payload = utf8Encoder.encode(buffer);
buffer.flip();
if (LOG.isDebugEnabled())
LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(payload));
frame.setPayload(payload);
frame.setFin(fin);
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
++frameCount;
// Any flush after the first will be a CONTINUATION frame.
frame = new Frame(OpCode.CONTINUATION);
}
}
private void send(char[] chars, int offset, int length) throws IOException
{
synchronized (this)
{
if (closed)
throw new IOException("Stream is closed");
CharBuffer source = CharBuffer.wrap(chars, offset, length);
int remaining = length;
while (remaining > 0)
{
int read = source.read(buffer);
if (read == -1)
{
return;
}
remaining -= read;
if (remaining > 0)
{
// If we could not write everything, it means
// that the buffer was full, so flush it.
flush(false);
}
}
}
} }
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
try outputStream.close();
{
flush(true);
if (LOG.isDebugEnabled())
LOG.debug("Stream closed, {} frames sent", frameCount);
// Notify without holding locks.
notifySuccess();
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}
public void setCallback(Callback callback)
{
synchronized (this)
{
this.callback = callback;
}
}
private void notifySuccess()
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.succeeded();
}
}
private void notifyFailure(Throwable failure)
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.failed(failure);
}
} }
} }