427587 - MessageInputStream must copy the payload.

This commit is contained in:
Simone Bordet 2014-02-06 21:18:21 +01:00
parent e2805b5b1b
commit cf37533d87
1 changed files with 13 additions and 13 deletions

View File

@ -41,25 +41,19 @@ public class MessageInputStream extends InputStream implements MessageAppender
private static final Logger LOG = Log.getLogger(MessageInputStream.class);
// EOF (End of Buffers)
private final static ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();
/**
* Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/
@SuppressWarnings("unused")
private final LogicalConnection connection;
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private AtomicBoolean closed = new AtomicBoolean(false);
private final long timeoutMs;
private ByteBuffer activeBuffer = null;
private long timeoutMs = -1;
public MessageInputStream(LogicalConnection connection)
{
this.connection = connection;
this.timeoutMs = -1; // disabled
this(connection, -1);
}
public MessageInputStream(LogicalConnection connection, int timeoutMs)
{
this.connection = connection;
this.timeoutMs = timeoutMs;
}
@ -71,16 +65,22 @@ public class MessageInputStream extends InputStream implements MessageAppender
LOG.debug("appendMessage(ByteBuffer,{}): {}",fin,BufferUtil.toDetailString(framePayload));
}
// if closed, we should just toss incoming payloads into the bit bucket.
// If closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
{
return;
}
// Put the payload into the queue
// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
try
{
buffers.put(framePayload);
int capacity = framePayload.remaining();
// TODO: the copy buffer should be pooled too, but no buffer pool available from here.
ByteBuffer copy = framePayload.isDirect() ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
copy.put(framePayload);
buffers.put(copy);
}
catch (InterruptedException e)
{
@ -141,7 +141,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
{
if (timeoutMs == -1)
if (timeoutMs < 0)
{
// infinite take
activeBuffer = buffers.take();