move logic from messageComplete into appendFrame with fin==true

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-10-14 10:26:08 +11:00
parent 09947681fe
commit 419eefc2ef

View File

@ -77,31 +77,42 @@ public class MessageInputStream extends InputStream implements MessageAppender
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload)); LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload));
// Avoid entering synchronized block if there is nothing to do.
boolean bufferIsEmpty = BufferUtil.isEmpty(framePayload);
if (bufferIsEmpty && !fin)
return;
try try
{ {
if (BufferUtil.isEmpty(framePayload))
return;
synchronized (this) synchronized (this)
{ {
switch (state) if (!bufferIsEmpty)
{ {
case CLOSED: switch (state)
return; {
case CLOSED:
return;
case RESUMED: case RESUMED:
suspendToken = session.suspend(); suspendToken = session.suspend();
state = State.SUSPENDED; state = State.SUSPENDED;
break; break;
default: default:
throw new IllegalStateException(); throw new IllegalStateException();
}
// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
buffers.put(copy(framePayload));
} }
// Put the payload into the queue, by copying it. if (fin)
// Copying is necessary because the payload will {
// be processed after this method returns. buffers.add(EOF);
buffers.put(copy(framePayload)); state = State.COMPLETE;
}
} }
} }
catch (InterruptedException e) catch (InterruptedException e)
@ -131,32 +142,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
} }
} }
@Override
public void messageComplete()
{
if (LOG.isDebugEnabled())
LOG.debug("Message completed");
synchronized (this)
{
switch (state)
{
case CLOSED:
return;
case SUSPENDED:
case RESUMED:
state = State.COMPLETE;
break;
default:
throw new IllegalStateException();
}
buffers.offer(EOF);
}
}
public void handlerComplete() public void handlerComplete()
{ {
// Close the InputStream. // Close the InputStream.
@ -202,6 +187,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
return -1; return -1;
} }
// todo: what if we get a buffer with no content and we never resume
// grab a fresh buffer // grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining()) while (activeBuffer == null || !activeBuffer.hasRemaining())
{ {
@ -279,6 +265,12 @@ public class MessageInputStream extends InputStream implements MessageAppender
} }
} }
@Override
public void messageComplete()
{
// We handle this case in appendFrame with fin==true.
}
@Override @Override
public void reset() throws IOException public void reset() throws IOException
{ {