Issue #2349 - Review HTTP/2 max streams enforcement. (#2389)

* Issue #2349 - Review HTTP/2 max streams enforcement.

Changed the max concurrent remote streams enforcement algorithm.
It is now based on the stream count and the closing stream count,
updated atomically in a state machine in HTTP2Stream.

Fixed Javadoc.

Fixed close() method.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-03-28 04:10:11 +02:00 committed by Greg Wilkins
parent 56fc71a54a
commit 8cf15659be
6 changed files with 253 additions and 51 deletions

View File

@ -18,7 +18,56 @@
package org.eclipse.jetty.http2;
/**
* The set of close states for a stream or a session.
* <pre>
* rcv hc
* NOT_CLOSED ---------------&gt; REMOTELY_CLOSED
* | |
* gen| |gen
* hc| |hc
* | |
* v rcv hc v
* LOCALLY_CLOSING --------------&gt; CLOSING
* | |
* snd| |gen
* hc| |hc
* | |
* v rcv hc v
* LOCALLY_CLOSED ----------------&gt; CLOSED
* </pre>
*/
public enum CloseState
{
NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
/**
* Fully open.
*/
NOT_CLOSED,
/**
* A half-close frame has been generated.
*/
LOCALLY_CLOSING,
/**
* A half-close frame has been generated and sent.
*/
LOCALLY_CLOSED,
/**
* A half-close frame has been received.
*/
REMOTELY_CLOSED,
/**
* A half-close frame has been received and a half-close frame has been generated, but not yet sent.
*/
CLOSING,
/**
* Fully closed.
*/
CLOSED;
public enum Event
{
RECEIVED,
BEFORE_SEND,
AFTER_SEND
}
}

View File

@ -51,6 +51,7 @@ import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
@ -72,7 +73,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
private final AtomicInteger remoteStreamCount = new AtomicInteger();
private final AtomicBiInteger remoteStreamCount = new AtomicBiInteger();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
@ -718,14 +719,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// SPEC: exceeding max concurrent streams is treated as stream error.
while (true)
{
int remoteCount = remoteStreamCount.get();
long encoded = remoteStreamCount.get();
int remoteCount = AtomicBiInteger.getHi(encoded);
int remoteClosing = AtomicBiInteger.getLo(encoded);
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount >= maxCount)
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
return null;
}
if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing))
break;
}
@ -748,6 +751,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
void updateStreamCount(boolean local, int deltaStreams, int deltaClosing)
{
if (local)
localStreamCount.addAndGet(deltaStreams);
else
remoteStreamCount.add(deltaStreams, deltaClosing);
}
protected IStream newStream(int streamId, boolean local)
{
return new HTTP2Stream(scheduler, this, streamId, local);
@ -759,18 +770,10 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream removed = streams.remove(stream.getId());
if (removed != null)
{
boolean local = stream.isLocal();
if (local)
localStreamCount.decrementAndGet();
else
remoteStreamCount.decrementAndGet();
onStreamClosed(stream);
flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled())
LOG.debug("Removed {} {}", local ? "local" : "remote", stream);
LOG.debug("Removed {} {}", stream.isLocal() ? "local" : "remote", stream);
}
}
@ -1167,7 +1170,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
bytes = frameBytes = generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
prepare();
beforeSend();
return true;
}
@ -1184,10 +1187,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
* sender, the action may have not been performed yet, causing the larger
* data to be rejected, when it should have been accepted.</p>
*/
private void prepare()
private void beforeSend()
{
switch (frame.getType())
{
case HEADERS:
{
HeadersFrame headersFrame = (HeadersFrame)frame;
stream.updateClose(headersFrame.isEndStream(), CloseState.Event.BEFORE_SEND);
break;
}
case SETTINGS:
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
@ -1213,7 +1222,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
onStreamOpened(stream);
HeadersFrame headersFrame = (HeadersFrame)frame;
if (stream.updateClose(headersFrame.isEndStream(), true))
if (stream.updateClose(headersFrame.isEndStream(), CloseState.Event.AFTER_SEND))
removeStream(stream);
break;
}
@ -1230,7 +1239,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// Pushed streams are implicitly remotely closed.
// They are closed when sending an end-stream DATA frame.
stream.updateClose(true, false);
stream.updateClose(true, CloseState.Event.RECEIVED);
break;
}
case GO_AWAY:
@ -1317,15 +1326,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int length = Math.min(dataBytes, window);
// Only one DATA frame is generated.
bytes = frameBytes = generator.data(lease, (DataFrame)frame, length);
DataFrame dataFrame = (DataFrame)frame;
bytes = frameBytes = generator.data(lease, dataFrame, length);
int written = bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes);
LOG.debug("Generated {}, length/window/data={}/{}/{}", dataFrame, written, window, dataBytes);
this.dataWritten = written;
this.dataBytes -= written;
flowControl.onDataSending(stream, written);
stream.updateClose(dataFrame.isEndStream(), CloseState.Event.BEFORE_SEND);
return true;
}
@ -1342,7 +1353,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// Only now we can update the close state
// and eventually remove the stream.
if (stream.updateClose(dataFrame.isEndStream(), true))
if (stream.updateClose(dataFrame.isEndStream(), CloseState.Event.AFTER_SEND))
removeStream(stream);
super.succeeded();
}

View File

@ -264,7 +264,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onHeaders(HeadersFrame frame, Callback callback)
{
if (updateClose(frame.isEndStream(), false))
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
callback.succeeded();
}
@ -295,7 +295,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
return;
}
if (updateClose(frame.isEndStream(), false))
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
notifyData(this, frame, callback);
}
@ -312,7 +312,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
// Pushed streams are implicitly locally closed.
// They are closed when receiving an end-stream DATA frame.
updateClose(true, true);
updateClose(true, CloseState.Event.AFTER_SEND);
callback.succeeded();
}
@ -322,14 +322,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
@Override
public boolean updateClose(boolean update, boolean local)
public boolean updateClose(boolean update, CloseState.Event event)
{
if (LOG.isDebugEnabled())
LOG.debug("Update close for {} close={} local={}", this, update, local);
LOG.debug("Update close for {} update={} event={}", this, update, event);
if (!update)
return false;
switch (event)
{
case RECEIVED:
return updateCloseAfterReceived();
case BEFORE_SEND:
return updateCloseBeforeSend();
case AFTER_SEND:
return updateCloseAfterSend();
default:
return false;
}
}
private boolean updateCloseAfterReceived()
{
while (true)
{
CloseState current = closeState.get();
@ -337,22 +352,79 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
case NOT_CLOSED:
{
CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
if (closeState.compareAndSet(current, newValue))
if (closeState.compareAndSet(current, CloseState.REMOTELY_CLOSED))
return false;
break;
}
case LOCALLY_CLOSING:
{
if (closeState.compareAndSet(current, CloseState.CLOSING))
{
updateStreamCount(0, 1);
return false;
}
break;
}
case LOCALLY_CLOSED:
{
if (local)
return false;
close();
return true;
}
default:
{
return false;
}
}
}
}
private boolean updateCloseBeforeSend()
{
while (true)
{
CloseState current = closeState.get();
switch (current)
{
case NOT_CLOSED:
{
if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING))
return false;
break;
}
case REMOTELY_CLOSED:
{
if (!local)
if (closeState.compareAndSet(current, CloseState.CLOSING))
{
updateStreamCount(0, 1);
return false;
}
break;
}
default:
{
return false;
}
}
}
}
private boolean updateCloseAfterSend()
{
while (true)
{
CloseState current = closeState.get();
switch (current)
{
case NOT_CLOSED:
case LOCALLY_CLOSING:
{
if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED))
return false;
break;
}
case REMOTELY_CLOSED:
case CLOSING:
{
close();
return true;
}
@ -389,9 +461,19 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public void close()
{
if (closeState.getAndSet(CloseState.CLOSED) != CloseState.CLOSED)
CloseState oldState = closeState.getAndSet(CloseState.CLOSED);
if (oldState != CloseState.CLOSED)
{
int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0;
updateStreamCount(-1, deltaClosing);
onClose();
}
}
private void updateStreamCount(int deltaStream, int deltaClosing)
{
((HTTP2Session)session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
}
@Override
public void succeeded()

View File

@ -76,12 +76,10 @@ public interface IStream extends Stream, Closeable
* <p>Updates the close state of this stream.</p>
*
* @param update whether to update the close state
* @param local whether the update comes from a local operation
* (such as sending a frame that ends the stream)
* or a remote operation (such as receiving a frame
* @param event the event that caused the close state update
* @return whether the stream has been fully closed by this invocation
*/
public boolean updateClose(boolean update, boolean local);
public boolean updateClose(boolean update, CloseState.Event event);
/**
* <p>Forcibly closes this stream.</p>

View File

@ -0,0 +1,42 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class EmptyServerHandler extends AbstractHandler.ErrorDispatchHandler
{
@Override
protected final void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
jettyRequest.setHandled(true);
service(target, jettyRequest, request, response);
}
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
}
}

View File

@ -18,11 +18,10 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -32,7 +31,6 @@ import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
@ -53,12 +51,11 @@ public class MaxConcurrentStreamsTest extends AbstractTest
public void testOneConcurrentStream() throws Exception
{
long sleep = 1000;
start(1, new AbstractHandler()
start(1, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
// Sleep a bit to allow the second request to be queued.
sleep(sleep);
}
@ -91,17 +88,42 @@ public class MaxConcurrentStreamsTest extends AbstractTest
Assert.assertTrue(latch.await(5 * sleep, TimeUnit.MILLISECONDS));
}
@Test
public void testManyIterationsWithConcurrentStreams() throws Exception
{
int concurrency = 1;
start(concurrency, new EmptyServerHandler());
int iterations = 50;
IntStream.range(0, concurrency).parallel().forEach(i ->
IntStream.range(0, iterations).forEach(j ->
{
try
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.path("/" + i + "_" + j)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
}
catch (Throwable x)
{
throw new RuntimeException(x);
}
})
);
}
@Test
public void testTwoConcurrentStreamsThirdWaits() throws Exception
{
int maxStreams = 2;
long sleep = 1000;
start(maxStreams, new AbstractHandler()
start(maxStreams, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
sleep(sleep);
}
});
@ -140,12 +162,11 @@ public class MaxConcurrentStreamsTest extends AbstractTest
public void testAbortedWhileQueued() throws Exception
{
long sleep = 1000;
start(1, new AbstractHandler()
start(1, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
sleep(sleep);
}
});
@ -170,12 +191,11 @@ public class MaxConcurrentStreamsTest extends AbstractTest
{
int maxConcurrent = 10;
long sleep = 500;
start(maxConcurrent, new AbstractHandler()
start(maxConcurrent, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
sleep(sleep);
}
});