434685 - WebSocket read/parse does not discard remaining network buffer after unrecoverable error case
+ Make all future onFillable events after a fundamental read/parse exception discard the network buffer.
This commit is contained in:
parent
b515da6456
commit
832332aaad
|
@ -188,6 +188,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
return countOnFillableEvents.get();
|
||||
}
|
||||
}
|
||||
|
||||
private static enum ReadMode
|
||||
{
|
||||
PARSE,
|
||||
DISCARD,
|
||||
EOF
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
|
||||
|
||||
|
@ -206,6 +213,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
private WebSocketSession session;
|
||||
private List<ExtensionConfig> extensions;
|
||||
private boolean isFilling;
|
||||
private ReadMode readMode = ReadMode.PARSE;
|
||||
private IOState ioState;
|
||||
private Stats stats = new Stats();
|
||||
|
||||
|
@ -441,18 +449,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
LOG.debug("{} onFillable()",policy.getBehavior());
|
||||
stats.countOnFillableEvents.incrementAndGet();
|
||||
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
|
||||
boolean readMore = false;
|
||||
try
|
||||
{
|
||||
isFilling = true;
|
||||
readMore = (read(buffer) != -1);
|
||||
|
||||
if(readMode == ReadMode.PARSE)
|
||||
{
|
||||
readMode = readParse(buffer);
|
||||
} else
|
||||
{
|
||||
readMode = readDiscard(buffer);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
bufferPool.release(buffer);
|
||||
}
|
||||
|
||||
if (readMore && (suspendToken.get() == false))
|
||||
if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
|
||||
{
|
||||
fillInterested();
|
||||
}
|
||||
|
@ -462,6 +476,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected void onFillInterestedFailed(Throwable cause)
|
||||
{
|
||||
|
@ -521,7 +537,45 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
flusher.enqueue(frame,callback,batchMode);
|
||||
}
|
||||
|
||||
private int read(ByteBuffer buffer)
|
||||
private ReadMode readDiscard(ByteBuffer buffer)
|
||||
{
|
||||
EndPoint endPoint = getEndPoint();
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
int filled = endPoint.fill(buffer);
|
||||
if (filled == 0)
|
||||
{
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
else if (filled < 0)
|
||||
{
|
||||
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
|
||||
return ReadMode.EOF;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.ignore(e);
|
||||
return ReadMode.EOF;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.ignore(t);
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
}
|
||||
|
||||
private ReadMode readParse(ByteBuffer buffer)
|
||||
{
|
||||
EndPoint endPoint = getEndPoint();
|
||||
try
|
||||
|
@ -531,13 +585,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
int filled = endPoint.fill(buffer);
|
||||
if (filled == 0)
|
||||
{
|
||||
return 0;
|
||||
return ReadMode.PARSE;
|
||||
}
|
||||
else if (filled < 0)
|
||||
{
|
||||
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
|
||||
ioState.onReadFailure(new EOFException("Remote Read EOF"));
|
||||
return -1;
|
||||
return ReadMode.EOF;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -553,19 +607,20 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
LOG.warn(e);
|
||||
close(StatusCode.PROTOCOL,e.getMessage());
|
||||
return -1;
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
catch (CloseException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
close(e.getStatusCode(),e.getMessage());
|
||||
return -1;
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn(t);
|
||||
close(StatusCode.ABNORMAL,t.getMessage());
|
||||
return -1;
|
||||
// TODO: should probably only switch to discard if a non-ws-endpoint error
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
|
@ -42,10 +43,14 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AnnotatedMaxMessageSizeTest
|
||||
{
|
||||
@Rule
|
||||
public TestTracker tracker = new TestTracker();
|
||||
|
||||
private static Server server;
|
||||
private static ServerConnector connector;
|
||||
private static URI serverUri;
|
||||
|
@ -122,7 +127,8 @@ public class AnnotatedMaxMessageSizeTest
|
|||
client.expectUpgradeResponse();
|
||||
|
||||
// Generate text frame
|
||||
byte buf[] = new byte[90*1024]; // buffer bigger than maxMessageSize
|
||||
int size = 120 * 1024;
|
||||
byte buf[] = new byte[size]; // buffer bigger than maxMessageSize
|
||||
Arrays.fill(buf,(byte)'x');
|
||||
client.write(new TextFrame().setPayload(ByteBuffer.wrap(buf)));
|
||||
|
||||
|
|
Loading…
Reference in New Issue