Issue #4475 - use state machine for MessageInputStream

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-01-17 18:37:20 +11:00
parent 3fd7094c01
commit 3d309a5797
1 changed files with 80 additions and 41 deletions

View File

@ -48,8 +48,15 @@ public class MessageInputStream extends InputStream implements MessageAppender
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private final long timeoutMs; private final long timeoutMs;
private ByteBuffer activeBuffer = null; private ByteBuffer activeBuffer = null;
private volatile boolean closed = false; private SuspendToken suspendToken;
private volatile SuspendToken suspendToken; private State state = State.RESUMED;
private enum State
{
RESUMED,
SUSPENDED,
CLOSED
}
public MessageInputStream(Session session) public MessageInputStream(Session session)
{ {
@ -61,7 +68,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
this.timeoutMs = timeoutMs; this.timeoutMs = timeoutMs;
this.session = session; this.session = session;
this.bufferPool = (session instanceof WebSocketSession) ? ((WebSocketSession)session).getBufferPool() : null; this.bufferPool = (session instanceof WebSocketSession) ? ((WebSocketSession)session).getBufferPool() : null;
this.suspendToken = session.suspend();
} }
@Override @Override
@ -70,8 +76,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload)); LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload));
// If closed, we should just toss incoming payloads into the bit bucket. // Early non atomic test that we aren't closed to avoid an unnecessary copy (will be checked again later).
if (closed) if (state == State.CLOSED)
return; return;
// Put the payload into the queue, by copying it. // Put the payload into the queue, by copying it.
@ -89,11 +95,20 @@ public class MessageInputStream extends InputStream implements MessageAppender
synchronized (this) synchronized (this)
{ {
if (closed) switch (state)
return; {
case CLOSED:
return;
case RESUMED:
suspendToken = session.suspend();
state = State.SUSPENDED;
break;
case SUSPENDED:
throw new IllegalStateException();
}
if (suspendToken == null)
suspendToken = session.suspend();
buffers.put(copy); buffers.put(copy);
} }
} }
@ -101,35 +116,37 @@ public class MessageInputStream extends InputStream implements MessageAppender
{ {
throw new IOException(e); throw new IOException(e);
} }
finally
{
if (fin)
buffers.offer(EOF);
}
}
private ByteBuffer acquire(int capacity, boolean direct)
{
ByteBuffer buffer;
if (bufferPool != null)
buffer = bufferPool.acquire(capacity, direct);
else
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
return buffer;
} }
@Override @Override
public void close() public void close()
{ {
SuspendToken resume = null;
synchronized (this) synchronized (this)
{ {
closed = true; switch (state)
{
case CLOSED:
return;
case SUSPENDED:
resume = suspendToken;
suspendToken = null;
state = State.CLOSED;
break;
case RESUMED:
state = State.CLOSED;
break;
}
buffers.clear(); buffers.clear();
buffers.offer(EOF); buffers.offer(EOF);
} }
// Resume to discard util we reach next message. // May need to resume to discard until we reach next message.
resume(); if (resume != null)
resume.resume();
} }
@Override @Override
@ -157,7 +174,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
{ {
try try
{ {
if (closed) if (state == State.CLOSED)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Stream closed"); LOG.debug("Stream closed");
@ -194,7 +211,31 @@ public class MessageInputStream extends InputStream implements MessageAppender
int result = activeBuffer.get() & 0xFF; int result = activeBuffer.get() & 0xFF;
if (!activeBuffer.hasRemaining()) if (!activeBuffer.hasRemaining())
resume(); {
SuspendToken resume = null;
synchronized (this)
{
switch (state)
{
case CLOSED:
return -1;
case SUSPENDED:
resume = suspendToken;
suspendToken = null;
state = State.RESUMED;
break;
case RESUMED:
throw new IllegalStateException();
}
}
// Get more content to read.
if (resume != null)
resume.resume();
}
return result; return result;
} }
@ -207,21 +248,19 @@ public class MessageInputStream extends InputStream implements MessageAppender
} }
} }
private void resume()
{
SuspendToken resume;
synchronized (this)
{
resume = suspendToken;
suspendToken = null;
}
if (resume != null)
resume.resume();
}
@Override @Override
public void reset() throws IOException public void reset() throws IOException
{ {
throw new IOException("reset() not supported"); throw new IOException("reset() not supported");
} }
private ByteBuffer acquire(int capacity, boolean direct)
{
ByteBuffer buffer;
if (bufferPool != null)
buffer = bufferPool.acquire(capacity, direct);
else
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
return buffer;
}
} }