Issue #2679 - HTTP/2 Spec Compliance.

Fixed stream ID validation and stream state handling.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-07-03 15:51:14 +02:00
parent d35fa69e1f
commit 0ec8f312f6
4 changed files with 113 additions and 39 deletions

View File

@ -78,6 +78,7 @@ public class HTTP2ClientSession extends HTTP2Session
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
// HEADERS can be received for normal and pushed responses.
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream != null)
@ -96,7 +97,23 @@ public class HTTP2ClientSession extends HTTP2Session
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
LOG.debug("Stream #{} not found", streamId);
if ((streamId & 1) == 1)
{
// Normal stream.
// Headers or trailers arriving after
// the stream has been reset are ignored.
if (!isLocalStreamClosed(streamId))
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_headers_frame");
}
else
{
// Pushed stream.
// Headers or trailers arriving after
// the stream has been reset are ignored.
if (!isRemoteStreamClosed(streamId))
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_headers_frame");
}
}
}

View File

@ -72,8 +72,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private static final Logger LOG = Log.getLogger(HTTP2Session.class);
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger localStreamIds = new AtomicInteger();
private final AtomicInteger lastRemoteStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
private final AtomicBiInteger remoteStreamCount = new AtomicBiInteger();
private final AtomicInteger sendWindow = new AtomicInteger();
@ -105,7 +105,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
this.flusher = new HTTP2Flusher(this);
this.maxLocalStreams = -1;
this.maxRemoteStreams = -1;
this.streamIds.set(initialStreamId);
this.localStreamIds.set(initialStreamId);
this.lastRemoteStreamId.set((initialStreamId & 0x01) == 0x01 ? 0 : -1);
this.streamIdleTimeout = endPoint.getIdleTimeout();
this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
@ -229,35 +230,44 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
LOG.debug("Received {}", frame);
int streamId = frame.getStreamId();
final IStream stream = getStream(streamId);
IStream stream = getStream(streamId);
// SPEC: the session window must be updated even if the stream is null.
// The flow control length includes the padding bytes.
final int flowControlLength = frame.remaining() + frame.padding();
int flowControlLength = frame.remaining() + frame.padding();
flowControl.onDataReceived(this, stream, flowControlLength);
if (stream != null)
{
if (getRecvWindow() < 0)
{
close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", callback);
}
onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", callback);
else
{
stream.process(frame, new DataCallback(callback, stream, flowControlLength));
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
LOG.debug("Stream #{} not found", streamId);
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
callback.succeeded();
if (isRemoteStreamClosed(streamId))
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback);
}
}
protected boolean isLocalStreamClosed(int streamId)
{
return streamId <= localStreamIds.get();
}
protected boolean isRemoteStreamClosed(int streamId)
{
return streamId <= getLastRemoteStreamId();
}
@Override
public abstract void onHeaders(HeadersFrame frame);
@ -274,11 +284,19 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
IStream stream = getStream(frame.getStreamId());
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream != null)
{
stream.process(frame, new ResetCallback());
}
else
notifyReset(this, frame);
{
if (isRemoteStreamClosed(streamId))
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}
}
@Override
@ -449,7 +467,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
if (stream != null)
{
int streamSendWindow = stream.updateSendWindow(0);
if (overflows(streamSendWindow, windowDelta))
if (sumOverflows(streamSendWindow, windowDelta))
{
reset(new ResetFrame(streamId, ErrorCode.FLOW_CONTROL_ERROR.code), Callback.NOOP);
}
@ -459,6 +477,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
onWindowUpdate(stream, frame);
}
}
else
{
if (!isRemoteStreamClosed(streamId))
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_window_update_frame");
}
}
}
else
@ -470,7 +493,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
else
{
int sessionSendWindow = updateSendWindow(0);
if (overflows(sessionSendWindow, windowDelta))
if (sumOverflows(sessionSendWindow, windowDelta))
onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "invalid_flow_control_window");
else
onWindowUpdate(null, frame);
@ -478,7 +501,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
private boolean overflows(int a, int b)
private boolean sumOverflows(int a, int b)
{
try
{
@ -494,7 +517,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void onConnectionFailure(int error, String reason)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason));
onConnectionFailure(error, reason, Callback.NOOP);
}
private void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
}
@Override
@ -510,7 +538,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int streamId = frame.getStreamId();
if (streamId <= 0)
{
streamId = streamIds.getAndAdd(2);
streamId = localStreamIds.getAndAdd(2);
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
priority.getWeight(), priority.isExclusive());
@ -539,7 +567,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream stream = streams.get(streamId);
if (stream == null)
{
streamId = streamIds.getAndAdd(2);
streamId = localStreamIds.getAndAdd(2);
frame = new PriorityFrame(streamId, frame.getParentStreamId(),
frame.getWeight(), frame.isExclusive());
}
@ -557,7 +585,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
boolean queued;
synchronized (this)
{
int streamId = streamIds.getAndAdd(2);
int streamId = localStreamIds.getAndAdd(2);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
IStream pushStream = createLocalStream(streamId);
@ -657,7 +685,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
reason = reason.substring(0, Math.min(reason.length(), 32));
payload = reason.getBytes(StandardCharsets.UTF_8);
}
return new GoAwayFrame(closeState, lastStreamId.get(), error, payload);
return new GoAwayFrame(closeState, getLastRemoteStreamId(), error, payload);
}
@Override
@ -764,7 +792,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// SPEC: duplicate stream is treated as connection error.
if (streams.putIfAbsent(streamId, stream) == null)
{
updateLastStreamId(streamId);
updateLastRemoteStreamId(streamId);
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
@ -773,7 +801,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
else
{
close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP);
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream");
return null;
}
}
@ -1042,9 +1070,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return !endPoint.isOpen();
}
private void updateLastStreamId(int streamId)
protected int getLastRemoteStreamId()
{
Atomics.updateMax(lastStreamId, streamId);
return lastRemoteStreamId.get();
}
private void updateLastRemoteStreamId(int streamId)
{
Atomics.updateMax(lastRemoteStreamId, streamId);
}
protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
@ -1506,11 +1539,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
private final int error;
private final String reason;
private final Callback callback;
private CloseCallback(int error, String reason)
private CloseCallback(int error, String reason, Callback callback)
{
this.error = error;
this.reason = reason;
this.callback = callback;
}
@Override
@ -1533,7 +1568,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void complete()
{
close(error, reason, Callback.NOOP);
close(error, reason, callback);
}
}

View File

@ -45,7 +45,7 @@ public class GoAwayGenerator extends FrameGenerator
public int generateGoAway(ByteBufferPool.Lease lease, int lastStreamId, int error, byte[] payload)
{
if (lastStreamId < 0)
throw new IllegalArgumentException("Invalid last stream id: " + lastStreamId);
lastStreamId = 0;
// The last streamId + the error code.
int fixedLength = 4 + 4;

View File

@ -83,16 +83,39 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
int streamId = frame.getStreamId();
if ((streamId & 1) != 1)
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_stream_id");
return;
}
IStream stream = getStream(streamId);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
IStream stream = createRemoteStream(frame.getStreamId());
if (stream != null)
if (stream == null)
{
onStreamOpened(stream);
stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
if (isRemoteStreamClosed(streamId))
{
onConnectionFailure(ErrorCode.STREAM_CLOSED_ERROR.code, "unexpected_headers_frame");
}
else
{
stream = createRemoteStream(streamId);
if (stream != null)
{
onStreamOpened(stream);
stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
}
}
}
else
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream");
}
}
else if (metaData.isResponse())
@ -102,8 +125,6 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
else
{
// Trailers.
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream != null)
{
stream.process(frame, Callback.NOOP);
@ -112,7 +133,8 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
LOG.debug("Stream #{} not found", streamId);
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_headers_frame");
}
}
}