From cf37533d87007cfc9736ff8699b63ccd0449bf06 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 6 Feb 2014 21:18:21 +0100 Subject: [PATCH] 427587 - MessageInputStream must copy the payload. --- .../common/message/MessageInputStream.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 4f2a6c9d298..640cc4b0581 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -41,25 +41,19 @@ public class MessageInputStream extends InputStream implements MessageAppender private static final Logger LOG = Log.getLogger(MessageInputStream.class); // EOF (End of Buffers) private final static ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer(); - /** - * Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet. - */ - @SuppressWarnings("unused") - private final LogicalConnection connection; + private final BlockingDeque buffers = new LinkedBlockingDeque<>(); private AtomicBoolean closed = new AtomicBoolean(false); + private final long timeoutMs; private ByteBuffer activeBuffer = null; - private long timeoutMs = -1; public MessageInputStream(LogicalConnection connection) { - this.connection = connection; - this.timeoutMs = -1; // disabled + this(connection, -1); } public MessageInputStream(LogicalConnection connection, int timeoutMs) { - this.connection = connection; this.timeoutMs = timeoutMs; } @@ -71,16 +65,22 @@ public class MessageInputStream extends InputStream implements MessageAppender LOG.debug("appendMessage(ByteBuffer,{}): {}",fin,BufferUtil.toDetailString(framePayload)); } - // if closed, we should just toss incoming payloads into the bit bucket. + // If closed, we should just toss incoming payloads into the bit bucket. if (closed.get()) { return; } - // Put the payload into the queue + // Put the payload into the queue, by copying it. + // Copying is necessary because the payload will + // be processed after this method returns. try { - buffers.put(framePayload); + int capacity = framePayload.remaining(); + // TODO: the copy buffer should be pooled too, but no buffer pool available from here. + ByteBuffer copy = framePayload.isDirect() ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); + copy.put(framePayload); + buffers.put(copy); } catch (InterruptedException e) { @@ -141,7 +141,7 @@ public class MessageInputStream extends InputStream implements MessageAppender // grab a fresh buffer while (activeBuffer == null || !activeBuffer.hasRemaining()) { - if (timeoutMs == -1) + if (timeoutMs < 0) { // infinite take activeBuffer = buffers.take();