Made sure that exceptions thrown by the parser are caught and the
connection closed.
This commit is contained in:
parent
aef2f42b5b
commit
c0e0b802d9
|
@ -74,79 +74,88 @@ public class Parser
|
|||
|
||||
public boolean parse(ByteBuffer buffer)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsing {}", buffer);
|
||||
|
||||
while (true)
|
||||
try
|
||||
{
|
||||
switch (state)
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsing {}", buffer);
|
||||
|
||||
while (true)
|
||||
{
|
||||
case HEADER:
|
||||
switch (state)
|
||||
{
|
||||
if (!headerParser.parse(buffer))
|
||||
return false;
|
||||
state = State.BODY;
|
||||
break;
|
||||
}
|
||||
case BODY:
|
||||
{
|
||||
int type = headerParser.getFrameType();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsing {} frame", FrameType.from(type));
|
||||
if (type < 0 || type >= bodyParsers.length)
|
||||
case HEADER:
|
||||
{
|
||||
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "unknown_frame_type_" + type);
|
||||
return false;
|
||||
}
|
||||
BodyParser bodyParser = bodyParsers[type];
|
||||
if (headerParser.getLength() == 0)
|
||||
{
|
||||
boolean async = bodyParser.emptyBody();
|
||||
reset();
|
||||
if (async)
|
||||
return true;
|
||||
if (!buffer.hasRemaining())
|
||||
if (!headerParser.parse(buffer))
|
||||
return false;
|
||||
state = State.BODY;
|
||||
break;
|
||||
}
|
||||
else
|
||||
case BODY:
|
||||
{
|
||||
BodyParser.Result result = bodyParser.parse(buffer);
|
||||
switch (result)
|
||||
int type = headerParser.getFrameType();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsing {} frame", FrameType.from(type));
|
||||
if (type < 0 || type >= bodyParsers.length)
|
||||
{
|
||||
case PENDING:
|
||||
{
|
||||
// Not enough bytes.
|
||||
return false;
|
||||
}
|
||||
case ASYNC:
|
||||
{
|
||||
// The content will be processed asynchronously, stop parsing;
|
||||
// the asynchronous operation will eventually resume parsing.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsed {} frame, asynchronous processing", FrameType.from(type));
|
||||
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "unknown_frame_type_" + type);
|
||||
return false;
|
||||
}
|
||||
BodyParser bodyParser = bodyParsers[type];
|
||||
if (headerParser.getLength() == 0)
|
||||
{
|
||||
boolean async = bodyParser.emptyBody();
|
||||
reset();
|
||||
if (async)
|
||||
return true;
|
||||
}
|
||||
case COMPLETE:
|
||||
if (!buffer.hasRemaining())
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
BodyParser.Result result = bodyParser.parse(buffer);
|
||||
switch (result)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsed {} frame, synchronous processing", FrameType.from(type));
|
||||
reset();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
case PENDING:
|
||||
{
|
||||
// Not enough bytes.
|
||||
return false;
|
||||
}
|
||||
case ASYNC:
|
||||
{
|
||||
// The content will be processed asynchronously, stop parsing;
|
||||
// the asynchronous operation will eventually resume parsing.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsed {} frame, asynchronous processing", FrameType.from(type));
|
||||
return true;
|
||||
}
|
||||
case COMPLETE:
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsed {} frame, synchronous processing", FrameType.from(type));
|
||||
reset();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.debug(x);
|
||||
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "parser_error");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected void notifyConnectionFailure(int error, String reason)
|
||||
|
|
|
@ -42,33 +42,42 @@ public class ServerParser extends Parser
|
|||
@Override
|
||||
public boolean parse(ByteBuffer buffer)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsing {}", buffer);
|
||||
|
||||
while (true)
|
||||
try
|
||||
{
|
||||
switch (state)
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsing {}", buffer);
|
||||
|
||||
while (true)
|
||||
{
|
||||
case PREFACE:
|
||||
switch (state)
|
||||
{
|
||||
if (!prefaceParser.parse(buffer))
|
||||
return false;
|
||||
if (onPreface())
|
||||
return true;
|
||||
state = State.FRAMES;
|
||||
break;
|
||||
}
|
||||
case FRAMES:
|
||||
{
|
||||
// Stay forever in the FRAMES state.
|
||||
return super.parse(buffer);
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
case PREFACE:
|
||||
{
|
||||
if (!prefaceParser.parse(buffer))
|
||||
return false;
|
||||
if (onPreface())
|
||||
return true;
|
||||
state = State.FRAMES;
|
||||
break;
|
||||
}
|
||||
case FRAMES:
|
||||
{
|
||||
// Stay forever in the FRAMES state.
|
||||
return super.parse(buffer);
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.debug(x);
|
||||
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "parser_error");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean onPreface()
|
||||
|
|
Loading…
Reference in New Issue