Cleaning up parsing layer to be less finicky

This commit is contained in:
Joakim Erdfelt 2012-06-19 11:13:54 -07:00
parent 993c3788b9
commit c1f3ad8d0e
12 changed files with 371 additions and 246 deletions

View File

@ -2,17 +2,29 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class BinaryPayloadParser extends PayloadParser
{
private Parser baseParser;
import org.eclipse.jetty.websocket.frames.BinaryFrame;
public BinaryPayloadParser(Parser parser)
/**
* Parsing for the {@link BinaryFrame}.
*/
public class BinaryPayloadParser extends FrameParser<BinaryFrame>
{
private BinaryFrame frame;
public BinaryPayloadParser()
{
this.baseParser = parser;
super();
frame = new BinaryFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public BinaryFrame getFrame()
{
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buffer)
{
// TODO Auto-generated method stub
return false;
@ -21,8 +33,6 @@ public class BinaryPayloadParser extends PayloadParser
@Override
public void reset()
{
// TODO Auto-generated method stub
super.reset();
}
}

View File

@ -2,17 +2,29 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class ClosePayloadParser extends PayloadParser
{
private Parser baseParser;
import org.eclipse.jetty.websocket.frames.CloseFrame;
public ClosePayloadParser(Parser parser)
/**
* Parsing for the {@link CloseFrame}.
*/
public class ClosePayloadParser extends FrameParser<CloseFrame>
{
private CloseFrame frame;
public ClosePayloadParser()
{
this.baseParser = parser;
super();
frame = new CloseFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public CloseFrame getFrame()
{
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buffer)
{
// TODO Auto-generated method stub
return false;
@ -21,8 +33,7 @@ public class ClosePayloadParser extends PayloadParser
@Override
public void reset()
{
// TODO Auto-generated method stub
super.reset();
}
}

View File

@ -2,17 +2,29 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class ContinuationPayloadParser extends PayloadParser
{
private Parser baseParser;
import org.eclipse.jetty.websocket.frames.ContinuationFrame;
public ContinuationPayloadParser(Parser parser)
/**
* Parsing for the {@link ContinuationFrame}.
*/
public class ContinuationPayloadParser extends FrameParser<ContinuationFrame>
{
private ContinuationFrame frame;
public ContinuationPayloadParser()
{
this.baseParser = parser;
super();
frame = new ContinuationFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public ContinuationFrame getFrame()
{
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buf)
{
// TODO Auto-generated method stub
return false;
@ -21,7 +33,6 @@ public class ContinuationPayloadParser extends PayloadParser
@Override
public void reset()
{
// TODO Auto-generated method stub
super.reset();
}
}

View File

@ -0,0 +1,218 @@
package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.OpCode;
import org.eclipse.jetty.websocket.frames.BaseFrame;
/**
* Base Framing Protocol handling
*
* <pre>
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-------+-+-------------+-------------------------------+
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
* |I|S|S|S| (4) |A| (7) | (16/64) |
* |N|V|V|V| |S| | (if payload len==126/127) |
* | |1|2|3| |K| | |
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
* | Extended payload length continued, if payload len == 127 |
* + - - - - - - - - - - - - - - - +-------------------------------+
* | |Masking-key, if MASK set to 1 |
* +-------------------------------+-------------------------------+
* | Masking-key (continued) | Payload Data |
* +-------------------------------- - - - - - - - - - - - - - - - +
* : Payload Data continued ... :
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
* | Payload Data continued ... |
* +---------------------------------------------------------------+
* </pre>
*/
public abstract class FrameParser<T extends BaseFrame>
{
private enum State
{
PAYLOAD_LEN,
PAYLOAD_LEN_BYTES,
MASK,
MASK_BYTES,
PAYLOAD
}
private static final Logger LOG = Log.getLogger(FrameParser.class);
private State state = State.PAYLOAD_LEN;
private int length = 0;
private int cursor = 0;
/**
* The frame that is being parsed
*
* @return the frame that is being parsed. should always return an object (never null)
*/
public abstract T getFrame();
/**
* Initialize the base framing values.
*
* @param fin
* @param rsv1
* @param rsv2
* @param rsv3
* @param opcode
*/
public final void initFrame(boolean fin, boolean rsv1, boolean rsv2, boolean rsv3, OpCode opcode)
{
T frame = getFrame();
frame.setFin(fin);
frame.setRsv1(rsv1);
frame.setRsv2(rsv2);
frame.setRsv3(rsv3);
frame.setOpCode(opcode);
}
/**
* Parse the base framing protocol buffer.
* <p>
* Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method
* <p>
* Not overridable
*
* @param buffer
* the buffer to parse from.
* @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol.
*/
public final boolean parseBaseFraming(ByteBuffer buffer)
{
LOG.debug("Parsing {} bytes",buffer.remaining());
while (buffer.hasRemaining())
{
switch (state)
{
case PAYLOAD_LEN:
{
byte b = buffer.get();
getFrame().setMasked((b & 0x80) != 0);
length = (byte)(0x7F & b);
if (b == 127)
{
// length 4 bytes (extended payload length)
if (buffer.remaining() >= 4)
{
length = buffer.getInt();
}
else
{
length = 0;
state = State.PAYLOAD_LEN_BYTES;
cursor = 4;
break; // continue onto next state
}
}
else if (b == 126)
{
// length 2 bytes (extended payload length)
if (buffer.remaining() >= 2)
{
length = buffer.getShort();
}
else
{
length = 0;
state = State.PAYLOAD_LEN_BYTES;
cursor = 2;
break; // continue onto next state
}
}
getFrame().setPayloadLength(length);
if (getFrame().isMasked())
{
state = State.MASK;
}
else
{
state = State.PAYLOAD;
}
break;
}
case PAYLOAD_LEN_BYTES:
{
byte b = buffer.get();
--cursor;
length |= (b & 0xFF) << (8 * cursor);
if (cursor == 0)
{
getFrame().setPayloadLength(length);
if (getFrame().isMasked())
{
state = State.MASK;
}
else
{
state = State.PAYLOAD;
}
}
break;
}
case MASK:
{
byte m[] = new byte[4];
getFrame().setMask(m);
if (buffer.remaining() >= 4)
{
buffer.get(m,0,4);
state = State.PAYLOAD;
}
else
{
state = State.MASK_BYTES;
cursor = 4;
}
break;
}
case MASK_BYTES:
{
byte b = buffer.get();
--cursor;
getFrame().getMask()[cursor] = b;
if (cursor == 0)
{
state = State.PAYLOAD;
}
break;
}
}
if (state == State.PAYLOAD)
{
return true;
}
}
return false;
}
/**
* Implementation specific parsing of a payload
*
* @param buffer
* the payload buffer
* @return true if payload is done reading, false if incomplete
*/
public abstract boolean parsePayload(ByteBuffer buffer);
/**
* Reset the frame and parser states
*/
public void reset() {
// reset parser
state = State.PAYLOAD_LEN;
// reset frame
getFrame().reset();
}
}

View File

@ -11,8 +11,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.OpCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.ControlFrame;
import org.eclipse.jetty.websocket.frames.DataFrame;
/**
* Parsing of a frame in WebSocket land.
@ -41,36 +39,14 @@ import org.eclipse.jetty.websocket.frames.DataFrame;
public class Parser {
public interface Listener extends EventListener
{
public static class Adapter implements Listener
{
@Override
public void onControlFrame(final ControlFrame frame)
{
}
@Override
public void onDataFrame(final DataFrame frame)
{
}
@Override
public void onWebSocketException(WebSocketException e)
{
}
}
public void onControlFrame(final ControlFrame frame);
public void onDataFrame(final DataFrame frame);
public void onFrame(final BaseFrame frame);
public void onWebSocketException(WebSocketException e);
}
private enum State
{
FINOP,
PAYLOAD_LEN,
PAYLOAD_LEN_BYTES,
MASK,
MASK_BYTES,
BASE_FRAMING,
PAYLOAD
}
@ -78,28 +54,23 @@ public class Parser {
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private State state = State.FINOP;
private final EnumMap<OpCode, PayloadParser> parsers = new EnumMap<>(OpCode.class);
private final EnumMap<OpCode, FrameParser> parsers = new EnumMap<>(OpCode.class);
// Holder for the values represented in the baseframe being parsed.
private BaseFrame baseframe = new BaseFrame();
private int length = 0;
private int cursor = 0;
private PayloadParser parser;
private FrameParser parser;
public Parser()
{
/*
* TODO: Investigate addition of decompression factory similar to SPDY work in situation of negotiated deflate extension?
*/
baseframe = new BaseFrame();
reset();
parsers.put(OpCode.CONTINUATION,new ContinuationPayloadParser(this));
parsers.put(OpCode.TEXT,new TextPayloadParser(this));
parsers.put(OpCode.BINARY,new BinaryPayloadParser(this));
parsers.put(OpCode.CLOSE,new ClosePayloadParser(this));
parsers.put(OpCode.PING,new PingPayloadParser(this));
parsers.put(OpCode.PONG,new PongPayloadParser(this));
parsers.put(OpCode.CONTINUATION,new ContinuationPayloadParser());
parsers.put(OpCode.TEXT,new TextPayloadParser());
parsers.put(OpCode.BINARY,new BinaryPayloadParser());
parsers.put(OpCode.CLOSE,new ClosePayloadParser());
parsers.put(OpCode.PING,new PingPayloadParser());
parsers.put(OpCode.PONG,new PongPayloadParser());
}
public void addListener(Listener listener)
@ -107,34 +78,14 @@ public class Parser {
listeners.add(listener);
}
protected BaseFrame getBaseFrame()
protected void notifyFrame(final BaseFrame f)
{
return baseframe;
}
protected void notifyControlFrame(final ControlFrame f)
{
LOG.debug("Notify Control Frame: {}",f);
LOG.debug("Notify Frame: {}",f);
for (Listener listener : listeners)
{
try
{
listener.onControlFrame(f);
}
catch (Throwable t)
{
LOG.warn(t);
}
}
}
protected void notifyDataFrame(final DataFrame frame) {
LOG.debug("Notify Data Frame: {}",frame);
for (Listener listener : listeners)
{
try
{
listener.onDataFrame(frame);
listener.onFrame(f);
}
catch (Throwable t)
{
@ -165,110 +116,31 @@ public class Parser {
// peek at byte
byte b = buffer.get();
byte flags = (byte)(0xF & (b >> 4));
baseframe.setFin((flags & BaseFrame.FLAG_FIN) == 1);
baseframe.setRsv1((flags & BaseFrame.FLAG_RSV1) == 1);
baseframe.setRsv2((flags & BaseFrame.FLAG_RSV2) == 1);
baseframe.setRsv3((flags & BaseFrame.FLAG_RSV3) == 1);
boolean fin = ((flags & BaseFrame.FLAG_FIN) == 1);
boolean rsv1 = ((flags & BaseFrame.FLAG_RSV1) == 1);
boolean rsv2 = ((flags & BaseFrame.FLAG_RSV2) == 1);
boolean rsv3 = ((flags & BaseFrame.FLAG_RSV3) == 1);
OpCode opcode = OpCode.from((byte)(b & 0xF));
baseframe.setOpCode(opcode);
if (opcode.isControlFrame() && !baseframe.isLastFrame())
if (opcode.isControlFrame() && !fin)
{
throw new WebSocketException("Fragmented Control Frame");
}
state = State.PAYLOAD_LEN;
break;
}
case PAYLOAD_LEN:
{
byte b = buffer.get();
baseframe.setMasked((b & 0x80) != 0);
length = (byte)(0x7F & b);
if (b == 127)
if (parser == null)
{
// length 4 bytes (extended payload length)
if (buffer.remaining() >= 4)
{
length = buffer.getInt();
}
else
{
length = 0;
state = State.PAYLOAD_LEN_BYTES;
cursor = 4;
break; // continue onto next state
}
}
else if (b == 126)
{
// length 2 bytes (extended payload length)
if (buffer.remaining() >= 2)
{
length = buffer.getShort();
}
else
{
length = 0;
state = State.PAYLOAD_LEN_BYTES;
cursor = 2;
break; // continue onto next state
}
}
baseframe.setPayloadLength(length);
if (baseframe.isMasked())
{
state = State.MASK;
}
else
{
state = State.PAYLOAD;
// Establish specific type parser and hand off to them.
parser = parsers.get(opcode);
parser.reset();
parser.initFrame(fin,rsv1,rsv2,rsv3,opcode);
}
state = State.BASE_FRAMING;
break;
}
case PAYLOAD_LEN_BYTES:
case BASE_FRAMING:
{
byte b = buffer.get();
--cursor;
length |= (b & 0xFF) << (8 * cursor);
if (cursor == 0)
{
baseframe.setPayloadLength(length);
if (baseframe.isMasked())
{
state = State.MASK;
}
else
{
state = State.PAYLOAD;
}
}
break;
}
case MASK:
{
byte m[] = new byte[4];
baseframe.setMask(m);
if (buffer.remaining() >= 4)
{
buffer.get(m,0,4);
state = State.PAYLOAD;
}
else
{
state = State.MASK_BYTES;
cursor = 4;
}
break;
}
case MASK_BYTES:
{
byte b = buffer.get();
--cursor;
baseframe.getMask()[cursor] = b;
if (cursor == 0)
if (parser.parseBaseFraming(buffer))
{
state = State.PAYLOAD;
}
@ -276,14 +148,9 @@ public class Parser {
}
case PAYLOAD:
{
if (parser == null)
{
// Establish specific type parser and hand off to them.
parser = parsers.get(baseframe.getOpCode());
}
if (parser.parse(buffer))
if (parser.parsePayload(buffer))
{
notifyFrame(parser.getFrame());
reset();
}
break;
@ -311,7 +178,7 @@ public class Parser {
public void reset()
{
state = State.FINOP;
parser.reset();
parser = null;
baseframe.reset();
}
}

View File

@ -1,9 +0,0 @@
package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public abstract class PayloadParser
{
public abstract boolean parse(ByteBuffer buffer);
public abstract void reset();
}

View File

@ -7,29 +7,28 @@ import org.eclipse.jetty.websocket.frames.PingFrame;
/**
* Parsing for the {@link PingFrame}.
*/
public class PingPayloadParser extends PayloadParser
public class PingPayloadParser extends FrameParser<PingFrame>
{
private Parser baseParser;
private PingFrame frame;
private ByteBuffer payload;
private int payloadLength;
public PingPayloadParser(Parser parser)
public PingPayloadParser()
{
this.baseParser = parser;
}
private void onPingFrame()
{
PingFrame ping = new PingFrame();
ping.copy(baseParser.getBaseFrame());
ping.setPayload(payload);
baseParser.notifyControlFrame(ping);
super();
frame = new PingFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public PingFrame getFrame()
{
payloadLength = baseParser.getBaseFrame().getPayloadLength();
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buffer)
{
payloadLength = getFrame().getPayloadLength();
while (buffer.hasRemaining())
{
if (payload == null)
@ -41,12 +40,12 @@ public class PingPayloadParser extends PayloadParser
int size = Math.min(payloadLength,buffer.remaining());
int limit = buffer.limit();
buffer.limit(buffer.position() + size);
ByteBuffer bytes = buffer.slice();
ByteBuffer bytes = buffer.slice(); // TODO: make sure reference to subsection is acceptable
buffer.limit(limit);
payload.put(bytes);
if (payload.position() >= payloadLength)
{
onPingFrame();
frame.setPayload(bytes);
return true;
}
}
@ -56,6 +55,7 @@ public class PingPayloadParser extends PayloadParser
@Override
public void reset()
{
super.reset();
payload = null;
}
}

View File

@ -2,17 +2,26 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class PongPayloadParser extends PayloadParser
{
private Parser baseParser;
import org.eclipse.jetty.websocket.frames.PongFrame;
public PongPayloadParser(Parser parser)
public class PongPayloadParser extends FrameParser<PongFrame>
{
private PongFrame frame;
public PongPayloadParser()
{
this.baseParser = parser;
super();
frame = new PongFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public PongFrame getFrame()
{
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buffer)
{
// TODO Auto-generated method stub
return false;
@ -21,8 +30,6 @@ public class PongPayloadParser extends PayloadParser
@Override
public void reset()
{
// TODO Auto-generated method stub
super.reset();
}
}

View File

@ -2,17 +2,26 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class TextPayloadParser extends PayloadParser
{
private Parser baseParser;
import org.eclipse.jetty.websocket.frames.TextFrame;
public TextPayloadParser(Parser parser)
public class TextPayloadParser extends FrameParser<TextFrame>
{
private TextFrame frame;
public TextPayloadParser()
{
this.baseParser = parser;
super();
frame = new TextFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public TextFrame getFrame()
{
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buf)
{
// TODO Auto-generated method stub
return false;
@ -21,8 +30,6 @@ public class TextPayloadParser extends PayloadParser
@Override
public void reset()
{
// TODO Auto-generated method stub
super.reset();
}
}

View File

@ -2,21 +2,34 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class UnknownPayloadParser extends PayloadParser
import org.eclipse.jetty.websocket.frames.BaseFrame;
public class UnknownPayloadParser extends FrameParser<BaseFrame>
{
private BaseFrame frame;
public UnknownPayloadParser()
{
super();
frame = new BaseFrame();
}
@Override
public boolean parse(ByteBuffer buffer)
public BaseFrame getFrame()
{
return frame;
}
@Override
public boolean parsePayload(ByteBuffer buffer)
{
// TODO Auto-generated method stub
return false;
}
@Override
public void reset()
{
// TODO Auto-generated method stub
super.reset();
}
}

View File

@ -8,8 +8,6 @@ import java.util.List;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.ControlFrame;
import org.eclipse.jetty.websocket.frames.DataFrame;
import org.junit.Assert;
public class FrameParseCapture implements Parser.Listener
@ -55,13 +53,7 @@ public class FrameParseCapture implements Parser.Listener
}
@Override
public void onControlFrame(ControlFrame frame)
{
frames.add(frame);
}
@Override
public void onDataFrame(DataFrame frame)
public void onFrame(BaseFrame frame)
{
frames.add(frame);
}

View File

@ -14,8 +14,6 @@ public class PingParserTest
TestLogging.enableDebug(Parser.class);
ByteBuffer buf = ByteBuffer.allocate(16);
// Raw bytes as found in RFC 6455, Section 5.7 - Examples
// Unmasked Ping request
buf.put(new byte[]
{ (byte)0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f });
buf.flip();