Simplified HTTP/2 parser and its listener.

It is not possible to perform asynchronous processing of the content
of DATA frames, because otherwise the parser has to stop, stalling
all other streams.
Parser.Listener methods were returning boolean in a vestigial attempt
to handle asynchronous data processing, and have now been converted to
return void.
This commit is contained in:
Simone Bordet 2015-02-12 19:43:40 +01:00
parent f7b9206b2e
commit 0677735550
35 changed files with 394 additions and 491 deletions

View File

@ -41,7 +41,7 @@ public class HTTP2ClientSession extends HTTP2Session
}
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -60,7 +60,6 @@ public class HTTP2ClientSession extends HTTP2Session
if (stream.isClosed())
removeStream(stream, false);
}
return false;
}
private void notifyHeaders(IStream stream, HeadersFrame frame)
@ -79,7 +78,7 @@ public class HTTP2ClientSession extends HTTP2Session
}
@Override
public boolean onPushPromise(PushPromiseFrame frame)
public void onPushPromise(PushPromiseFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -101,7 +100,6 @@ public class HTTP2ClientSession extends HTTP2Session
if (pushStream.isClosed())
removeStream(pushStream, false);
}
return false;
}
private Stream.Listener notifyPush(IStream stream, IStream pushStream, PushPromiseFrame frame)

View File

@ -133,10 +133,7 @@ public class HTTP2Connection extends AbstractConnection
if (looping)
{
while (buffer.hasRemaining())
{
if (parser.parse(buffer))
break;
}
parser.parse(buffer);
task = tasks.poll();
if (LOG.isDebugEnabled())

View File

@ -141,7 +141,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
public boolean onData(final DataFrame frame)
public void onData(final DataFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -159,29 +159,28 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (getRecvWindow() < 0)
{
close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.Adapter.INSTANCE);
return false;
}
boolean result = stream.process(frame, new Callback()
else
{
@Override
public void succeeded()
stream.process(frame, new Callback()
{
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
@Override
public void succeeded()
{
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
@Override
public void failed(Throwable x)
{
// Consume also in case of failures, to free the
// session flow control window for other streams.
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
});
if (stream.isClosed())
removeStream(stream, false);
return result;
@Override
public void failed(Throwable x)
{
// Consume also in case of failures, to free the
// session flow control window for other streams.
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
});
if (stream.isClosed())
removeStream(stream, false);
}
}
else
{
@ -190,21 +189,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
return false;
}
}
@Override
public abstract boolean onHeaders(HeadersFrame frame);
public abstract void onHeaders(HeadersFrame frame);
@Override
public boolean onPriority(PriorityFrame frame)
public void onPriority(PriorityFrame frame)
{
return false;
}
@Override
public boolean onReset(ResetFrame frame)
public void onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -217,15 +214,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null)
removeStream(stream, false);
return false;
}
@Override
public boolean onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame)
{
if (frame.isReply())
return false;
return;
// Iterate over all settings
for (Map.Entry<Integer, Integer> entry : frame.getSettings().entrySet())
@ -247,7 +242,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (value != 0 && value != 1)
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_enable_push");
return false;
return;
}
pushEnabled = value == 1;
break;
@ -274,7 +269,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH)
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
return false;
return;
}
generator.setMaxFrameSize(value);
break;
@ -297,11 +292,10 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: SETTINGS frame MUST be replied.
SettingsFrame reply = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
settings(reply, Callback.Adapter.INSTANCE);
return false;
}
@Override
public boolean onPing(PingFrame frame)
public void onPing(PingFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -314,7 +308,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
PingFrame reply = new PingFrame(frame.getPayload(), true);
control(null, Callback.Adapter.INSTANCE, reply);
}
return false;
}
/**
@ -330,13 +323,12 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
* performing their actions.
*
* @param frame the GO_AWAY frame that has been received.
* @return whether the parsing will be resumed asynchronously
* @see #close(int, String, Callback)
* @see #onShutdown()
* @see #onIdleTimeout()
*/
@Override
public boolean onGoAway(final GoAwayFrame frame)
public void onGoAway(final GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -366,7 +358,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
notifyClose(HTTP2Session.this, frame);
}
}, new DisconnectFrame());
return false;
return;
}
break;
}
@ -374,14 +366,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (LOG.isDebugEnabled())
LOG.debug("Ignored {}, already closed", frame);
return false;
return;
}
}
}
}
@Override
public boolean onWindowUpdate(WindowUpdateFrame frame)
public void onWindowUpdate(WindowUpdateFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
@ -396,7 +388,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
onWindowUpdate(null, frame);
}
return false;
}
@Override

View File

@ -193,26 +193,30 @@ public class HTTP2Stream extends IdleTimeout implements IStream
}
@Override
public boolean process(Frame frame, Callback callback)
public void process(Frame frame, Callback callback)
{
notIdle();
switch (frame.getType())
{
case HEADERS:
{
return onHeaders((HeadersFrame)frame, callback);
onHeaders((HeadersFrame)frame, callback);
break;
}
case DATA:
{
return onData((DataFrame)frame, callback);
onData((DataFrame)frame, callback);
break;
}
case RST_STREAM:
{
return onReset((ResetFrame)frame, callback);
onReset((ResetFrame)frame, callback);
break;
}
case PUSH_PROMISE:
{
return onPush((PushPromiseFrame)frame, callback);
onPush((PushPromiseFrame)frame, callback);
break;
}
default:
{
@ -221,21 +225,19 @@ public class HTTP2Stream extends IdleTimeout implements IStream
}
}
private boolean onHeaders(HeadersFrame frame, Callback callback)
private void onHeaders(HeadersFrame frame, Callback callback)
{
updateClose(frame.isEndStream(), false);
callback.succeeded();
return false;
}
private boolean onData(DataFrame frame, Callback callback)
private void onData(DataFrame frame, Callback callback)
{
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback);
return true;
}
// SPEC: remotely closed streams must be replied with a reset.
@ -243,34 +245,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE);
callback.failed(new EOFException("stream_closed"));
return true;
}
if (isReset())
{
// Just drop the frame.
callback.failed(new IOException("stream_reset"));
return true;
}
updateClose(frame.isEndStream(), false);
notifyData(this, frame, callback);
return false;
}
private boolean onReset(ResetFrame frame, Callback callback)
private void onReset(ResetFrame frame, Callback callback)
{
remoteReset = true;
callback.succeeded();
notifyReset(this, frame);
return false;
}
private boolean onPush(PushPromiseFrame frame, Callback callback)
private void onPush(PushPromiseFrame frame, Callback callback)
{
updateClose(true, true);
callback.succeeded();
return false;
}
@Override
@ -337,8 +334,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return recvWindow.getAndAdd(delta);
}
@Override
public void close()
private void close()
{
closeState.set(CloseState.CLOSED);
onClose();

View File

@ -27,27 +27,77 @@ import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
/**
* <p>The SPI interface for implementing a HTTP/2 session.</p>
* <p>This class extends {@link Session} by adding the methods required to
* implement the HTTP/2 session functionalities.</p>
*/
public interface ISession extends Session
{
@Override
public IStream getStream(int streamId);
/**
* <p>Enqueues the given frames to be written to the connection.</p>
*
* @param stream the stream the frames belong to
* @param callback the callback that gets notified when the frames have been sent
* @param frame the first frame to enqueue
* @param frames additional frames to enqueue
*/
public void control(IStream stream, Callback callback, Frame frame, Frame... frames);
/**
* <p>Enqueues the given PUSH_PROMISE frame to be written to the connection.</p>
* <p>Differently from {@link #control(IStream, Callback, Frame, Frame...)}, this method
* generates atomically the stream id for the pushed stream.</p>
*
* @param stream the stream associated to the pushed stream
* @param promise the promise that gets notified of the pushed stream creation
* @param frame the PUSH_PROMISE frame to enqueue
*/
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame);
/**
* <p>Enqueues the given DATA frame to be written to the connection.</p>
*
* @param stream the stream the data frame belongs to
* @param callback the callback that gets notified when the frame has been sent
* @param frame the DATA frame to send
*/
public void data(IStream stream, Callback callback, DataFrame frame);
/**
* <p>Updates the session send window by the given {@code delta}.</p>
*
* @param delta the delta value (positive or negative) to add to the session send window
* @return the previous value of the session send window
*/
public int updateSendWindow(int delta);
/**
* <p>Updates the session receive window by the given {@code delta}.</p>
*
* @param delta the delta value (positive or negative) to add to the session receive window
* @return the previous value of the session receive window
*/
public int updateRecvWindow(int delta);
/**
* <p>Callback method invoked when the a WINDOW_UPDATE frame has been received.</p>
*
* @param stream the stream the window update belongs to, or null if the window update belongs to the session
* @param frame the WINDOW_UPDATE frame received
*/
public void onWindowUpdate(IStream stream, WindowUpdateFrame frame);
/**
* @return whether the push functionality is enabled
*/
public boolean isPushEnabled();
/**
* Callback invoked when the connection reads -1.
* <p>Callback invoked when the connection reads -1.</p>
*
* @see #onIdleTimeout()
* @see #close(int, String, Callback)
@ -55,7 +105,7 @@ public interface ISession extends Session
public void onShutdown();
/**
* Callback invoked when the idle timeout expires.
* <p>Callback invoked when the idle timeout expires.</p>
*
* @see #onShutdown()
* @see #close(int, String, Callback)

View File

@ -22,35 +22,73 @@ import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.util.Callback;
/**
* <p>The SPI interface for implementing a HTTP/2 stream.</p>
* <p>This class extends {@link Stream} by adding the methods required to
* implement the HTTP/2 stream functionalities.</p>
*/
public interface IStream extends Stream
{
/**
* <p>The constant used as attribute key to store/retrieve the HTTP
* channel associated with this stream</p>
*
* @see #setAttribute(String, Object)
*/
public static final String CHANNEL_ATTRIBUTE = IStream.class.getName() + ".channel";
@Override
public ISession getSession();
/**
* @return the {@link Listener} associated with this stream
* @see #setListener(Listener)
*/
public Listener getListener();
/**
* @param listener the {@link Listener} associated with this stream
* @see #getListener()
*/
public void setListener(Listener listener);
public boolean process(Frame frame, Callback callback);
/**
* <p>Processes the given {@code frame}, belonging to this stream.</p>
*
* @param frame the frame to process
* @param callback the callback to complete when frame has been processed
*/
public void process(Frame frame, Callback callback);
/**
* Updates the close state of this stream.
* <p>Updates the close state of this stream.</p>
*
* @param update whether to update the close state
* @param local whether the update comes from a local operation
* (such as sending a frame that ends the stream)
* or a remote operation (such as receiving a frame
* that ends the stream).
* @param local whether the update comes from a local operation
* (such as sending a frame that ends the stream)
* or a remote operation (such as receiving a frame
* that ends the stream).
*/
public void updateClose(boolean update, boolean local);
/**
* @return the current value of the stream send window
*/
public int getSendWindow();
/**
* <p>Updates the stream send window by the given {@code delta}.</p>
*
* @param delta the delta value (positive or negative) to add to the stream send window
* @return the previous value of the stream send window
*/
public int updateSendWindow(int delta);
/**
* <p>Updates the stream receive window by the given {@code delta}.</p>
*
* @param delta the delta value (positive or negative) to add to the stream receive window
* @return the previous value of the stream receive window
*/
public int updateRecvWindow(int delta);
public void close();
}

View File

@ -35,6 +35,13 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>The base parser for the frame body of HTTP/2 frames.</p>
* <p>Subclasses implement {@link #parse(ByteBuffer)} to parse
* the frame specific body.</p>
*
* @see Parser
*/
public abstract class BodyParser
{
protected static final Logger LOG = Log.getLogger(BodyParser.class);
@ -48,13 +55,20 @@ public abstract class BodyParser
this.listener = listener;
}
public abstract Result parse(ByteBuffer buffer);
/**
* <p>Parses the body bytes in the given {@code buffer}; only the body
* bytes are consumed, therefore when this method returns, the buffer
* may contain unconsumed bytes.</p>
*
* @param buffer the buffer to parse
* @return true if the whole body bytes were parsed, false if not enough
* body bytes were present in the buffer
*/
public abstract boolean parse(ByteBuffer buffer);
protected boolean emptyBody(ByteBuffer buffer)
protected void emptyBody(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_frame");
return false;
connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_frame");
}
protected boolean hasFlag(int bit)
@ -82,139 +96,130 @@ public abstract class BodyParser
return headerParser.getLength();
}
protected boolean notifyData(DataFrame frame)
protected void notifyData(DataFrame frame)
{
try
{
return listener.onData(frame);
listener.onData(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyHeaders(HeadersFrame frame)
protected void notifyHeaders(HeadersFrame frame)
{
try
{
return listener.onHeaders(frame);
listener.onHeaders(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyPriority(PriorityFrame frame)
protected void notifyPriority(PriorityFrame frame)
{
try
{
return listener.onPriority(frame);
listener.onPriority(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyReset(ResetFrame frame)
protected void notifyReset(ResetFrame frame)
{
try
{
return listener.onReset(frame);
listener.onReset(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifySettings(SettingsFrame frame)
protected void notifySettings(SettingsFrame frame)
{
try
{
return listener.onSettings(frame);
listener.onSettings(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyPushPromise(PushPromiseFrame frame)
protected void notifyPushPromise(PushPromiseFrame frame)
{
try
{
return listener.onPushPromise(frame);
listener.onPushPromise(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyPing(PingFrame frame)
protected void notifyPing(PingFrame frame)
{
try
{
return listener.onPing(frame);
listener.onPing(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyGoAway(GoAwayFrame frame)
protected void notifyGoAway(GoAwayFrame frame)
{
try
{
return listener.onGoAway(frame);
listener.onGoAway(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected boolean notifyWindowUpdate(WindowUpdateFrame frame)
protected void notifyWindowUpdate(WindowUpdateFrame frame)
{
try
{
return listener.onWindowUpdate(frame);
listener.onWindowUpdate(frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
protected Result notifyConnectionFailure(int error, String reason)
protected boolean connectionFailure(ByteBuffer buffer, int error, String reason)
{
BufferUtil.clear(buffer);
notifyConnectionFailure(error, reason);
return false;
}
private void notifyConnectionFailure(int error, String reason)
{
try
{
listener.onConnectionFailure(error, reason);
return Result.ASYNC;
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return Result.ASYNC;
}
}
public enum Result
{
PENDING, ASYNC, COMPLETE
}
}

View File

@ -33,14 +33,10 @@ public class ContinuationBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
MetaData metaData = headerBlockParser.parse(buffer, getBodyLength());
if (metaData != null)
{
// TODO: CONTINUATION frames are not supported for now, we just parse them to keep HPACK happy.
return Result.ASYNC;
}
return Result.PENDING;
// TODO: CONTINUATION frames are not supported for now, we just parse them to keep HPACK happy.
return metaData != null;
}
}

View File

@ -45,19 +45,16 @@ public class DataBodyParser extends BodyParser
}
@Override
protected boolean emptyBody(ByteBuffer buffer)
protected void emptyBody(ByteBuffer buffer)
{
if (isPadding())
{
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_data_frame");
return false;
}
return onData(BufferUtil.EMPTY_BUFFER, false, 0);
connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_data_frame");
else
onData(BufferUtil.EMPTY_BUFFER, false, 0);
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
boolean loop = false;
while (buffer.hasRemaining() || loop)
@ -68,19 +65,10 @@ public class DataBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_data_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_data_frame");
length = getBodyLength();
if (isPadding())
{
state = State.PADDING_LENGTH;
}
else
{
state = State.DATA;
}
state = isPadding() ? State.PADDING_LENGTH : State.DATA;
break;
}
case PADDING_LENGTH:
@ -92,10 +80,7 @@ public class DataBodyParser extends BodyParser
state = State.DATA;
loop = length == 0;
if (length < 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_data_frame_padding");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_data_frame_padding");
break;
}
case DATA:
@ -115,20 +100,14 @@ public class DataBodyParser extends BodyParser
loop = paddingLength == 0;
// Padding bytes include the bytes that define the
// padding length plus the actual padding bytes.
if (onData(slice, false, padding + paddingLength))
{
return Result.ASYNC;
}
onData(slice, false, padding + paddingLength);
}
else
{
// We got partial data, simulate a smaller frame, and stay in DATA state.
// No padding for these synthetic frames (even if we have read
// the padding length already), it will be accounted at the end.
if (onData(slice, true, 0))
{
return Result.ASYNC;
}
onData(slice, true, 0);
}
break;
}
@ -140,7 +119,7 @@ public class DataBodyParser extends BodyParser
if (paddingLength == 0)
{
reset();
return Result.COMPLETE;
return true;
}
break;
}
@ -150,13 +129,13 @@ public class DataBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private boolean onData(ByteBuffer buffer, boolean fragment, int padding)
private void onData(ByteBuffer buffer, boolean fragment, int padding)
{
DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream(), padding);
return notifyData(frame);
notifyData(frame);
}
private enum State

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.util.BufferUtil;
public class GoAwayBodyParser extends BodyParser
{
@ -49,7 +48,7 @@ public class GoAwayBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -70,10 +69,7 @@ public class GoAwayBodyParser extends BodyParser
state = State.ERROR;
length -= 4;
if (length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
else
{
@ -89,19 +85,13 @@ public class GoAwayBodyParser extends BodyParser
lastStreamId += currByte << (8 * cursor);
--length;
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
if (cursor == 0)
{
lastStreamId &= 0x7F_FF_FF_FF;
state = State.ERROR;
if (length == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
break;
}
@ -113,14 +103,9 @@ public class GoAwayBodyParser extends BodyParser
state = State.PAYLOAD;
length -= 4;
if (length < 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
if (length == 0)
{
return onGoAway(lastStreamId, error, null);
}
}
else
{
@ -136,17 +121,12 @@ public class GoAwayBodyParser extends BodyParser
error += currByte << (8 * cursor);
--length;
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
if (cursor == 0)
{
state = State.PAYLOAD;
if (length == 0)
{
return onGoAway(lastStreamId, error, null);
}
}
break;
}
@ -170,9 +150,7 @@ public class GoAwayBodyParser extends BodyParser
payload[payload.length - cursor] = buffer.get();
--cursor;
if (cursor == 0)
{
return onGoAway(lastStreamId, error, payload);
}
break;
}
default:
@ -181,14 +159,15 @@ public class GoAwayBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private Result onGoAway(int lastStreamId, int error, byte[] payload)
private boolean onGoAway(int lastStreamId, int error, byte[] payload)
{
GoAwayFrame frame = new GoAwayFrame(lastStreamId, error, payload);
reset();
return notifyGoAway(frame) ? Result.ASYNC : Result.COMPLETE;
notifyGoAway(frame);
return true;
}
private enum State

View File

@ -22,6 +22,11 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.frames.Frame;
/**
* <p>The parser for the frame header of HTTP/2 frames.</p>
*
* @see Parser
*/
public class HeaderParser
{
private State state = State.LENGTH;
@ -44,12 +49,13 @@ public class HeaderParser
}
/**
* Parses the header bytes in the given {@code buffer}; only the header
* bytes are consumed, therefore the buffer may contain unconsumed bytes.
* <p>Parses the header bytes in the given {@code buffer}; only the header
* bytes are consumed, therefore when this method returns, the buffer may
* contain unconsumed bytes.</p>
*
* @param buffer the buffer to parse
* @return true if a whole header was parsed, false if not enough header
* bytes were present in the buffer
* @return true if the whole header bytes were parsed, false if not enough
* header bytes were present in the buffer
*/
public boolean parse(ByteBuffer buffer)
{
@ -59,8 +65,8 @@ public class HeaderParser
{
case LENGTH:
{
int octect = buffer.get() & 0xFF;
length = (length << 8) + octect;
int octet = buffer.get() & 0xFF;
length = (length << 8) + octet;
if (++cursor == 3)
{
length &= Frame.MAX_MAX_LENGTH;

View File

@ -56,16 +56,15 @@ public class HeadersBodyParser extends BodyParser
}
@Override
protected boolean emptyBody(ByteBuffer buffer)
protected void emptyBody(ByteBuffer buffer)
{
MetaData metaData = headerBlockParser.parse(BufferUtil.EMPTY_BUFFER, 0);
boolean result = onHeaders(0, 0, false, metaData);
onHeaders(0, 0, false, metaData);
reset();
return result;
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
boolean loop = false;
while (buffer.hasRemaining() || loop)
@ -76,17 +75,11 @@ public class HeadersBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
// For now we don't support HEADERS frames that don't have END_HEADERS.
if (!hasFlag(Flags.END_HEADERS))
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "unsupported_headers_frame");
}
return connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code, "unsupported_headers_frame");
length = getBodyLength();
@ -112,10 +105,7 @@ public class HeadersBodyParser extends BodyParser
state = hasFlag(Flags.PRIORITY) ? State.EXCLUSIVE : State.HEADERS;
loop = length == 0;
if (length < 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame_padding");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame_padding");
break;
}
case EXCLUSIVE:
@ -136,10 +126,7 @@ public class HeadersBodyParser extends BodyParser
length -= 4;
state = State.WEIGHT;
if (length < 1)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
else
{
@ -155,19 +142,13 @@ public class HeadersBodyParser extends BodyParser
streamId += currByte << (8 * cursor);
--length;
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
if (cursor == 0)
{
streamId &= 0x7F_FF_FF_FF;
state = State.WEIGHT;
if (length < 1)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
break;
}
@ -187,10 +168,7 @@ public class HeadersBodyParser extends BodyParser
// TODO: optimize of paddingLength==0: reset() here.
state = State.PADDING;
loop = paddingLength == 0;
if (onHeaders(streamId, weight, exclusive, metaData))
{
return Result.ASYNC;
}
onHeaders(streamId, weight, exclusive, metaData);
}
break;
}
@ -202,7 +180,7 @@ public class HeadersBodyParser extends BodyParser
if (paddingLength == 0)
{
reset();
return Result.COMPLETE;
return true;
}
break;
}
@ -212,18 +190,16 @@ public class HeadersBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private boolean onHeaders(int streamId, int weight, boolean exclusive, MetaData metaData)
private void onHeaders(int streamId, int weight, boolean exclusive, MetaData metaData)
{
PriorityFrame priorityFrame = null;
if (hasFlag(Flags.PRIORITY))
{
priorityFrame = new PriorityFrame(streamId, getStreamId(), weight, exclusive);
}
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, priorityFrame, isEndStream());
return notifyHeaders(frame);
notifyHeaders(frame);
}
private enum State

View File

@ -37,6 +37,11 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>The HTTP/2 protocol parser.</p>
* <p>This parser makes use of the {@link HeaderParser} and of
* {@link BodyParser}s to parse HTTP/2 frames.</p>
*/
public class Parser
{
private static final Logger LOG = Log.getLogger(Parser.class);
@ -72,7 +77,18 @@ public class Parser
state = State.HEADER;
}
public boolean parse(ByteBuffer buffer)
/**
* <p>Parses the given {@code buffer} bytes and emit events to a {@link Listener}.</p>
* <p>When this method returns, the buffer may not be fully consumed, so invocations
* to this method should be wrapped in a loop:</p>
* <pre>
* while (buffer.hasRemaining())
* parser.parse(buffer);
* </pre>
*
* @param buffer the buffer to parse
*/
public void parse(ByteBuffer buffer)
{
try
{
@ -83,7 +99,7 @@ public class Parser
case HEADER:
{
if (!headerParser.parse(buffer))
return false;
return;
state = State.BODY;
break;
}
@ -94,50 +110,25 @@ public class Parser
LOG.debug("Parsing {} frame", FrameType.from(type));
if (type < 0 || type >= bodyParsers.length)
{
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unknown_frame_type_" + type);
BufferUtil.clear(buffer);
return false;
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unknown_frame_type_" + type);
return;
}
BodyParser bodyParser = bodyParsers[type];
if (headerParser.getLength() == 0)
{
boolean async = bodyParser.emptyBody(buffer);
bodyParser.emptyBody(buffer);
reset();
if (async)
return true;
if (!buffer.hasRemaining())
return false;
return;
}
else
{
BodyParser.Result result = bodyParser.parse(buffer);
switch (result)
{
case PENDING:
{
// Not enough bytes.
return false;
}
case ASYNC:
{
// The content will be processed asynchronously, stop parsing;
// the asynchronous operation will eventually resume parsing.
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame, asynchronous processing", FrameType.from(type));
return true;
}
case COMPLETE:
{
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame, synchronous processing", FrameType.from(type));
reset();
break;
}
default:
{
throw new IllegalStateException();
}
}
if (!bodyParser.parse(buffer))
return;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame", FrameType.from(type));
reset();
}
break;
}
@ -152,9 +143,8 @@ public class Parser
{
if (LOG.isDebugEnabled())
LOG.debug(x);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "parser_error");
BufferUtil.clear(buffer);
return false;
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "parser_error");
}
}
@ -172,80 +162,71 @@ public class Parser
public interface Listener
{
public boolean onData(DataFrame frame);
public void onData(DataFrame frame);
public boolean onHeaders(HeadersFrame frame);
public void onHeaders(HeadersFrame frame);
public boolean onPriority(PriorityFrame frame);
public void onPriority(PriorityFrame frame);
public boolean onReset(ResetFrame frame);
public void onReset(ResetFrame frame);
public boolean onSettings(SettingsFrame frame);
public void onSettings(SettingsFrame frame);
public boolean onPushPromise(PushPromiseFrame frame);
public void onPushPromise(PushPromiseFrame frame);
public boolean onPing(PingFrame frame);
public void onPing(PingFrame frame);
public boolean onGoAway(GoAwayFrame frame);
public void onGoAway(GoAwayFrame frame);
public boolean onWindowUpdate(WindowUpdateFrame frame);
public void onWindowUpdate(WindowUpdateFrame frame);
public void onConnectionFailure(int error, String reason);
public static class Adapter implements Listener
{
@Override
public boolean onData(DataFrame frame)
public void onData(DataFrame frame)
{
return false;
}
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
return false;
}
@Override
public boolean onPriority(PriorityFrame frame)
public void onPriority(PriorityFrame frame)
{
return false;
}
@Override
public boolean onReset(ResetFrame frame)
public void onReset(ResetFrame frame)
{
return false;
}
@Override
public boolean onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame)
{
return false;
}
@Override
public boolean onPushPromise(PushPromiseFrame frame)
public void onPushPromise(PushPromiseFrame frame)
{
return false;
}
@Override
public boolean onPing(PingFrame frame)
public void onPing(PingFrame frame)
{
return false;
}
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
return false;
}
@Override
public boolean onWindowUpdate(WindowUpdateFrame frame)
public void onWindowUpdate(WindowUpdateFrame frame)
{
return false;
}
@Override

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.util.BufferUtil;
public class PingBodyParser extends BodyParser
{
@ -44,7 +43,7 @@ public class PingBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -54,16 +53,10 @@ public class PingBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() != 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_ping_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_ping_frame");
// SPEC: wrong body length is treated as connection error.
if (getBodyLength() != 8)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_ping_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_ping_frame");
state = State.PAYLOAD;
break;
}
@ -87,9 +80,7 @@ public class PingBodyParser extends BodyParser
payload[8 - cursor] = buffer.get();
--cursor;
if (cursor == 0)
{
return onPing(payload);
}
break;
}
default:
@ -98,14 +89,15 @@ public class PingBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private Result onPing(byte[] payload)
private boolean onPing(byte[] payload)
{
PingFrame frame = new PingFrame(payload, hasFlag(Flags.ACK));
reset();
return notifyPing(frame) ? Result.ASYNC : Result.COMPLETE;
notifyPing(frame);
return true;
}
private enum State

View File

@ -45,8 +45,8 @@ public class PrefaceParser
int currByte = buffer.get();
if (currByte != PrefaceFrame.PREFACE_BYTES[cursor])
{
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_preface");
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_preface");
return false;
}
++cursor;

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.util.BufferUtil;
public class PriorityBodyParser extends BodyParser
{
@ -45,7 +44,7 @@ public class PriorityBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -55,16 +54,10 @@ public class PriorityBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_priority_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_priority_frame");
int length = getBodyLength();
if (length != 5)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_priority_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_priority_frame");
state = State.EXCLUSIVE;
break;
}
@ -115,14 +108,15 @@ public class PriorityBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private Result onPriority(int streamId, int weight, boolean exclusive)
private boolean onPriority(int streamId, int weight, boolean exclusive)
{
PriorityFrame frame = new PriorityFrame(streamId, getStreamId(), weight, exclusive);
reset();
return notifyPriority(frame) ? Result.ASYNC : Result.COMPLETE;
notifyPriority(frame);
return true;
}
private enum State

View File

@ -24,7 +24,6 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.util.BufferUtil;
public class PushPromiseBodyParser extends BodyParser
{
@ -51,7 +50,7 @@ public class PushPromiseBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
boolean loop = false;
while (buffer.hasRemaining() || loop)
@ -62,17 +61,11 @@ public class PushPromiseBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_push_promise_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_push_promise_frame");
// For now we don't support PUSH_PROMISE frames that don't have END_HEADERS.
if (!hasFlag(Flags.END_HEADERS))
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "unsupported_push_promise_frame");
}
return connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code, "unsupported_push_promise_frame");
length = getBodyLength();
@ -93,10 +86,7 @@ public class PushPromiseBodyParser extends BodyParser
length -= paddingLength;
state = State.STREAM_ID;
if (length < 4)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_push_promise_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_push_promise_frame");
break;
}
case STREAM_ID:
@ -123,10 +113,7 @@ public class PushPromiseBodyParser extends BodyParser
streamId += currByte << (8 * cursor);
--length;
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_push_promise_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_push_promise_frame");
if (cursor == 0)
{
streamId &= 0x7F_FF_FF_FF;
@ -142,10 +129,7 @@ public class PushPromiseBodyParser extends BodyParser
{
state = State.PADDING;
loop = paddingLength == 0;
if (onPushPromise(streamId, metaData))
{
return Result.ASYNC;
}
onPushPromise(streamId, metaData);
}
break;
}
@ -157,7 +141,7 @@ public class PushPromiseBodyParser extends BodyParser
if (paddingLength == 0)
{
reset();
return Result.COMPLETE;
return true;
}
break;
}
@ -167,13 +151,13 @@ public class PushPromiseBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private boolean onPushPromise(int streamId, MetaData metaData)
private void onPushPromise(int streamId, MetaData metaData)
{
PushPromiseFrame frame = new PushPromiseFrame(getStreamId(), streamId, metaData);
return notifyPushPromise(frame);
notifyPushPromise(frame);
}
private enum State

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.BufferUtil;
public class ResetBodyParser extends BodyParser
{
@ -43,7 +42,7 @@ public class ResetBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -53,16 +52,10 @@ public class ResetBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_rst_stream_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_rst_stream_frame");
int length = getBodyLength();
if (length != 4)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_rst_stream_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_rst_stream_frame");
state = State.ERROR;
break;
}
@ -85,9 +78,7 @@ public class ResetBodyParser extends BodyParser
--cursor;
error += currByte << (8 * cursor);
if (cursor == 0)
{
return onReset(error);
}
break;
}
default:
@ -96,14 +87,15 @@ public class ResetBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private Result onReset(int error)
private boolean onReset(int error)
{
ResetFrame frame = new ResetFrame(getStreamId(), error);
reset();
return notifyReset(frame) ? Result.ASYNC : Result.COMPLETE;
notifyReset(frame);
return true;
}
private enum State

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -41,7 +42,7 @@ public class ServerParser extends Parser
}
@Override
public boolean parse(ByteBuffer buffer)
public void parse(ByteBuffer buffer)
{
try
{
@ -55,16 +56,16 @@ public class ServerParser extends Parser
case PREFACE:
{
if (!prefaceParser.parse(buffer))
return false;
if (onPreface())
return true;
return;
onPreface();
state = State.FRAMES;
break;
}
case FRAMES:
{
// Stay forever in the FRAMES state.
return super.parse(buffer);
super.parse(buffer);
return;
}
default:
{
@ -76,39 +77,37 @@ public class ServerParser extends Parser
catch (Throwable x)
{
LOG.debug(x);
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "parser_error");
return false;
}
}
protected boolean onPreface()
protected void onPreface()
{
return notifyPreface();
notifyPreface();
}
private boolean notifyPreface()
private void notifyPreface()
{
try
{
return listener.onPreface();
listener.onPreface();
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
public interface Listener extends Parser.Listener
{
public boolean onPreface();
public void onPreface();
public static class Adapter extends Parser.Listener.Adapter implements Listener
{
@Override
public boolean onPreface()
public void onPreface()
{
return false;
}
}
}

View File

@ -25,7 +25,6 @@ import java.util.Map;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -55,13 +54,13 @@ public class SettingsBodyParser extends BodyParser
}
@Override
protected boolean emptyBody(ByteBuffer buffer)
protected void emptyBody(ByteBuffer buffer)
{
return onSettings(new HashMap<Integer, Integer>()) == Result.ASYNC;
onSettings(new HashMap<Integer, Integer>());
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -71,10 +70,7 @@ public class SettingsBodyParser extends BodyParser
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() != 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_frame");
}
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_frame");
length = getBodyLength();
settings = new HashMap<>();
state = State.SETTING_ID;
@ -88,10 +84,7 @@ public class SettingsBodyParser extends BodyParser
state = State.SETTING_VALUE;
length -= 2;
if (length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
else
{
@ -108,10 +101,7 @@ public class SettingsBodyParser extends BodyParser
settingId += currByte << (8 * cursor);
--length;
if (length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
if (cursor == 0)
{
state = State.SETTING_VALUE;
@ -129,9 +119,7 @@ public class SettingsBodyParser extends BodyParser
state = State.SETTING_ID;
length -= 4;
if (length == 0)
{
return onSettings(settings);
}
}
else
{
@ -148,10 +136,7 @@ public class SettingsBodyParser extends BodyParser
settingValue += currByte << (8 * cursor);
--length;
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
if (cursor == 0)
{
if (LOG.isDebugEnabled())
@ -159,9 +144,7 @@ public class SettingsBodyParser extends BodyParser
settings.put(settingId, settingValue);
state = State.SETTING_ID;
if (length == 0)
{
return onSettings(settings);
}
}
break;
}
@ -171,14 +154,15 @@ public class SettingsBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private Result onSettings(Map<Integer, Integer> settings)
private boolean onSettings(Map<Integer, Integer> settings)
{
SettingsFrame frame = new SettingsFrame(settings, hasFlag(Flags.ACK));
reset();
return notifySettings(frame) ? Result.ASYNC : Result.COMPLETE;
notifySettings(frame);
return true;
}
private enum State

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.BufferUtil;
public class WindowUpdateBodyParser extends BodyParser
{
@ -43,7 +42,7 @@ public class WindowUpdateBodyParser extends BodyParser
}
@Override
public Result parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -53,10 +52,7 @@ public class WindowUpdateBodyParser extends BodyParser
{
int length = getBodyLength();
if (length != 4)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_window_update_frame");
}
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR.code, "invalid_window_update_frame");
state = State.WINDOW_DELTA;
break;
}
@ -92,14 +88,15 @@ public class WindowUpdateBodyParser extends BodyParser
}
}
}
return Result.PENDING;
return false;
}
private Result onWindowUpdate(int windowDelta)
private boolean onWindowUpdate(int windowDelta)
{
WindowUpdateFrame frame = new WindowUpdateFrame(getStreamId(), windowDelta);
reset();
return notifyWindowUpdate(frame) ? Result.ASYNC : Result.COMPLETE;
notifyWindowUpdate(frame);
return true;
}
private enum State

View File

@ -93,10 +93,9 @@ public class DataGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onData(DataFrame frame)
public void onData(DataFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -125,10 +124,9 @@ public class DataGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onData(DataFrame frame)
public void onData(DataFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -44,10 +44,9 @@ public class GoAwayGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -86,10 +85,9 @@ public class GoAwayGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -56,10 +56,9 @@ public class HeadersGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -102,10 +101,9 @@ public class HeadersGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -44,10 +44,9 @@ public class PingGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onPing(PingFrame frame)
public void onPing(PingFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -85,10 +84,9 @@ public class PingGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onPing(PingFrame frame)
public void onPing(PingFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -43,10 +43,9 @@ public class PriorityGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onPriority(PriorityFrame frame)
public void onPriority(PriorityFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -88,10 +87,9 @@ public class PriorityGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onPriority(PriorityFrame frame)
public void onPriority(PriorityFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -50,10 +50,9 @@ public class PushPromiseGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onPushPromise(PushPromiseFrame frame)
public void onPushPromise(PushPromiseFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -103,10 +102,9 @@ public class PushPromiseGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onPushPromise(PushPromiseFrame frame)
public void onPushPromise(PushPromiseFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -43,10 +43,9 @@ public class ResetGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onReset(ResetFrame frame)
public void onReset(ResetFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -84,10 +83,9 @@ public class ResetGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onReset(ResetFrame frame)
public void onReset(ResetFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -76,10 +76,9 @@ public class SettingsGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -145,10 +144,9 @@ public class SettingsGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -43,10 +43,9 @@ public class WindowUpdateGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onWindowUpdate(WindowUpdateFrame frame)
public void onWindowUpdate(WindowUpdateFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);
@ -84,10 +83,9 @@ public class WindowUpdateGenerateParseTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onWindowUpdate(WindowUpdateFrame frame)
public void onWindowUpdate(WindowUpdateFrame frame)
{
frames.add(frame);
return false;
}
}, 4096, 8192);

View File

@ -54,7 +54,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
@Override
public boolean onPreface()
public void onPreface()
{
// SPEC: send a SETTINGS frame upon receiving the preface.
Map<Integer, Integer> settings = notifyPreface(this);
@ -63,11 +63,10 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
SettingsFrame frame = new SettingsFrame(settings, false);
// TODO: consider sending a WINDOW_UPDATE to enlarge the session send window of the client.
control(null, Callback.Adapter.INSTANCE, frame, Frame.EMPTY_ARRAY);
return false;
}
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
@ -87,14 +86,12 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
{
onConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "invalid_request");
}
return false;
}
@Override
public boolean onPushPromise(PushPromiseFrame frame)
public void onPushPromise(PushPromiseFrame frame)
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "push_promise");
return false;
}
private Map<Integer, Integer> notifyPreface(Session session)

View File

@ -146,7 +146,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void requestContent(DataFrame frame, final Callback callback)
{
// We must copy the data since we do not know when its bytes will be consumed.
// We must copy the data since we do not know when the
// application will consume its bytes (we queue them by
// calling onContent()), and we cannot stop the parsing
// since there may be frames for other streams.
final ByteBufferPool byteBufferPool = getByteBufferPool();
ByteBuffer original = frame.getData();
final ByteBuffer copy = byteBufferPool.acquire(original.remaining(), original.isDirect());

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
@ -88,7 +89,7 @@ public class CloseTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
try
{
@ -96,11 +97,10 @@ public class CloseTest extends AbstractServerTest
// receiving the response headers.
client.close();
closeLatch.countDown();
return false;
}
catch (IOException x)
{
return false;
throw new RuntimeIOException(x);
}
}
}, 4096, 8192);
@ -152,12 +152,11 @@ public class CloseTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
// Even if we sent the GO_AWAY immediately after the
// HEADERS, the server is able to send us the response.
responseLatch.countDown();
return false;
}
}, 4096, 8192);
@ -216,17 +215,15 @@ public class CloseTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
responseLatch.countDown();
return false;
}
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
closeLatch.countDown();
return false;
}
}, 4096, 8192);

View File

@ -70,10 +70,9 @@ public class HTTP2ServerTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
latch.countDown();
return false;
}
}, 4096, 8192);
@ -113,18 +112,16 @@ public class HTTP2ServerTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame)
{
latch.countDown();
return false;
}
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
frameRef.set(frame);
latch.countDown();
return false;
}
}, 4096, 8192);
@ -172,26 +169,23 @@ public class HTTP2ServerTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame)
{
latch.countDown();
return false;
}
@Override
public boolean onHeaders(HeadersFrame frame)
public void onHeaders(HeadersFrame frame)
{
headersRef.set(frame);
latch.countDown();
return false;
}
@Override
public boolean onData(DataFrame frame)
public void onData(DataFrame frame)
{
dataRef.set(frame);
latch.countDown();
return false;
}
}, 4096, 8192);
@ -233,11 +227,10 @@ public class HTTP2ServerTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
Assert.assertEquals(ErrorCode.FRAME_SIZE_ERROR.code, frame.getError());
latch.countDown();
return false;
}
}, 4096, 8192);
@ -270,11 +263,10 @@ public class HTTP2ServerTest extends AbstractServerTest
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onGoAway(GoAwayFrame frame)
public void onGoAway(GoAwayFrame frame)
{
Assert.assertEquals(ErrorCode.PROTOCOL_ERROR.code, frame.getError());
latch.countDown();
return false;
}
}, 4096, 8192);

View File

@ -1,5 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.http2.LEVEL=INFO
#org.eclipse.jetty.http2.HTTP2Connection.LEVEL=DEBUG
#org.eclipse.jetty.http2.server.HttpChannelOverHTTP2.LEVEL=DEBUG
#org.eclipse.jetty.servlets.PushCacheFilter.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.LEVEL=DEBUG