Attempted fix for intermittent test failures around MessageInputStream

This commit is contained in:
Joakim Erdfelt 2013-12-10 12:59:07 -07:00
parent 6f87a9b995
commit 5b77679ecf
3 changed files with 58 additions and 45 deletions

View File

@ -22,21 +22,21 @@ import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Appender for messages (used for multiple fragments with continuations, and also to allow for streaming APIs)
* Appender for messages (used for multiple frames with continuations, and also to allow for streaming APIs)
*/
public interface MessageAppender
{
/**
* Append the payload to the message.
* Append the frame payload to the message.
*
* @param payload
* the payload to append.
* @param framePayload
* the frame payload to append.
* @param isLast
* flag indicating if this is the last part of the message or not.
* @throws IOException
* if unable to append the payload
* if unable to append the frame payload
*/
abstract void appendMessage(ByteBuffer payload, boolean isLast) throws IOException;
abstract void appendMessage(ByteBuffer framePayload, boolean isLast) throws IOException;
/**
* Notification that message is to be considered complete.

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
@ -38,6 +39,8 @@ import org.eclipse.jetty.websocket.common.LogicalConnection;
public class MessageInputStream extends InputStream implements MessageAppender
{
private static final Logger LOG = Log.getLogger(MessageInputStream.class);
// EOF (End of Buffers)
private final static ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();
/**
* Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/
@ -45,27 +48,27 @@ public class MessageInputStream extends InputStream implements MessageAppender
private final LogicalConnection connection;
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private AtomicBoolean closed = new AtomicBoolean(false);
// EOB / End of Buffers
private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
private ByteBuffer activeBuffer = null;
private long timeoutMs = -1;
public MessageInputStream(LogicalConnection connection)
{
this.connection = connection;
this.timeoutMs = -1; // disabled
}
public MessageInputStream(LogicalConnection connection, int timeoutMs)
{
this.connection = connection;
this.timeoutMs = timeoutMs;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
public void appendMessage(ByteBuffer framePayload, boolean fin) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("appendMessage(ByteBuffer,{}): {}",isLast,BufferUtil.toDetailString(payload));
}
if (buffersExhausted.get())
{
// This indicates a programming mistake/error and must be bug fixed
throw new RuntimeException("Last frame already received");
LOG.debug("appendMessage(ByteBuffer,{}): {}",fin,BufferUtil.toDetailString(framePayload));
}
// if closed, we should just toss incoming payloads into the bit bucket.
@ -77,23 +80,29 @@ public class MessageInputStream extends InputStream implements MessageAppender
// Put the payload into the queue
try
{
buffers.put(payload);
if (isLast)
{
buffersExhausted.set(true);
}
buffers.put(framePayload);
}
catch (InterruptedException e)
{
throw new IOException(e);
}
finally
{
if (fin)
{
buffers.offer(EOF);
}
}
}
@Override
public void close() throws IOException
{
closed.set(true);
super.close();
if (closed.compareAndSet(false,true))
{
buffers.offer(EOF);
super.close();
}
}
@Override
@ -112,24 +121,16 @@ public class MessageInputStream extends InputStream implements MessageAppender
public void messageComplete()
{
LOG.debug("messageComplete()");
buffersExhausted.set(true);
// toss an empty ByteBuffer into queue to let it drain
try
{
buffers.put(ByteBuffer.wrap(new byte[0]));
}
catch (InterruptedException ignore)
{
/* ignore */
}
buffers.offer(EOF);
}
@Override
public int read() throws IOException
{
LOG.debug("read()");
try
{
if (closed.get())
@ -137,19 +138,29 @@ public class MessageInputStream extends InputStream implements MessageAppender
return -1;
}
if (activeBuffer == null)
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
{
activeBuffer = buffers.take();
}
while (activeBuffer.remaining() <= 0)
{
if (buffersExhausted.get())
if (timeoutMs == -1)
{
// infinite take
activeBuffer = buffers.take();
}
else
{
// timeout specific
activeBuffer = buffers.poll(timeoutMs,TimeUnit.MILLISECONDS);
if (activeBuffer == null)
{
throw new IOException(String.format("Read timeout: %,dms expired",timeoutMs));
}
}
if (activeBuffer == EOF)
{
closed.set(true);
return -1;
}
activeBuffer = buffers.take();
}
return activeBuffer.get();
@ -159,7 +170,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
LOG.warn(e);
closed.set(true);
return -1;
// throw new IOException(e);
// throw new IOException(e);
}
}

View File

@ -49,13 +49,13 @@ public class MessageInputStreamTest
try (MessageInputStream stream = new MessageInputStream(conn))
{
// Append a message (simple, short)
// Append a single message (simple, short)
ByteBuffer payload = BufferUtil.toBuffer("Hello World",UTF8);
System.out.printf("payload = %s%n",BufferUtil.toDetailString(payload));
boolean fin = true;
stream.appendMessage(payload,fin);
// Read it from the stream.
// Read entire message it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
@ -75,6 +75,8 @@ public class MessageInputStreamTest
final AtomicBoolean hadError = new AtomicBoolean(false);
final CountDownLatch startLatch = new CountDownLatch(1);
// This thread fills the stream (from the "worker" thread)
// But slowly (intentionally).
new Thread(new Runnable()
{
@Override