Issue #5368 - changes from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-10-15 12:08:57 +11:00
parent 419eefc2ef
commit 680020dcb2
1 changed files with 23 additions and 14 deletions

View File

@ -53,9 +53,24 @@ public class MessageInputStream extends InputStream implements MessageAppender
private enum State
{
/**
* Open and waiting for a frame to be delivered in {@link #appendFrame(ByteBuffer, boolean)}.
*/
RESUMED,
/**
* We have suspended the session after reading a websocket frame but have not reached the end of the message.
*/
SUSPENDED,
/**
* We have received a frame with fin==true and have suspended until we are signaled that onMessage method exited.
*/
COMPLETE,
/**
* We have read to EOF or someone has called InputStream.close(), any further reads will result in reading -1.
*/
CLOSED
}
@ -99,13 +114,17 @@ public class MessageInputStream extends InputStream implements MessageAppender
break;
default:
throw new IllegalStateException();
throw new IllegalStateException("Incorrect State: " + state.name());
}
// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
buffers.put(copy(framePayload));
ByteBuffer copy = acquire(framePayload.remaining(), framePayload.isDirect());
BufferUtil.clearToFill(copy);
copy.put(framePayload);
BufferUtil.flipToFlush(copy, 0);
buffers.put(copy);
}
if (fin)
@ -134,7 +153,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
(activeBuffer != null && activeBuffer.hasRemaining());
if (remainingContent)
LOG.warn("MessageInputStream closed without fully consuming content");
LOG.warn("MessageInputStream closed without fully consuming content {}", session);
state = State.CLOSED;
buffers.clear();
@ -187,7 +206,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
return -1;
}
// todo: what if we get a buffer with no content and we never resume
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
{
@ -245,7 +263,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
break;
case RESUMED:
throw new IllegalStateException();
throw new IllegalStateException("Incorrect State: " + state.name());
}
}
@ -289,15 +307,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
return false;
}
private ByteBuffer copy(ByteBuffer buffer)
{
ByteBuffer copy = acquire(buffer.remaining(), buffer.isDirect());
BufferUtil.clearToFill(copy);
copy.put(buffer);
BufferUtil.flipToFlush(copy, 0);
return copy;
}
private ByteBuffer acquire(int capacity, boolean direct)
{
ByteBuffer buffer;