Fixes #2730 - Limit concurrent HTTP/2 pushed resources.
Using the same mechanism we use to limit max concurrent streams. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
9eca404da2
commit
9f5a7d33b0
|
@ -130,6 +130,7 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
|
||||
private int initialSessionRecvWindow = 16 * 1024 * 1024;
|
||||
private int initialStreamRecvWindow = 8 * 1024 * 1024;
|
||||
private int maxConcurrentPushedStreams = 32;
|
||||
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
|
||||
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
|
||||
|
||||
|
@ -336,6 +337,17 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
this.initialStreamRecvWindow = initialStreamRecvWindow;
|
||||
}
|
||||
|
||||
@ManagedAttribute("The max number of concurrent pushed streams")
|
||||
public int getMaxConcurrentPushedStreams()
|
||||
{
|
||||
return maxConcurrentPushedStreams;
|
||||
}
|
||||
|
||||
public void setMaxConcurrentPushedStreams(int maxConcurrentPushedStreams)
|
||||
{
|
||||
this.maxConcurrentPushedStreams = maxConcurrentPushedStreams;
|
||||
}
|
||||
|
||||
@ManagedAttribute("The max number of keys in all SETTINGS frames")
|
||||
public int getMaxSettingsKeys()
|
||||
{
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.http2.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -53,7 +52,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
private final Connection.Listener connectionListener = new ConnectionListener();
|
||||
|
||||
@Override
|
||||
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
||||
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
|
||||
{
|
||||
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
|
||||
ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
|
||||
|
@ -66,6 +65,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
Generator generator = new Generator(byteBufferPool);
|
||||
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
|
||||
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
|
||||
session.setMaxRemoteStreams(client.getMaxConcurrentPushedStreams());
|
||||
|
||||
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
|
||||
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());
|
||||
|
@ -111,6 +111,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
if (settings == null)
|
||||
settings = new HashMap<>();
|
||||
settings.computeIfAbsent(SettingsFrame.INITIAL_WINDOW_SIZE, k -> client.getInitialStreamRecvWindow());
|
||||
settings.computeIfAbsent(SettingsFrame.MAX_CONCURRENT_STREAMS, k -> client.getMaxConcurrentPushedStreams());
|
||||
|
||||
PrefaceFrame prefaceFrame = new PrefaceFrame();
|
||||
SettingsFrame settingsFrame = new SettingsFrame(settings, false);
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.client;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MaxPushedStreamsTest extends AbstractTest
|
||||
{
|
||||
@Test
|
||||
public void testMaxPushedStreams() throws Exception
|
||||
{
|
||||
int maxPushed = 2;
|
||||
|
||||
CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
// Trick the server into thinking it can push unlimited streams.
|
||||
((HTTP2Session)stream.getSession()).setMaxLocalStreams(-1);
|
||||
|
||||
BiFunction<List<Stream>, Stream, List<Stream>> add = (l, s) -> { l.add(s); return l; };
|
||||
BinaryOperator<List<Stream>> addAll = (l1, l2) -> { l1.addAll(l2); return l1; };
|
||||
CompletableFuture<List<Stream>> result = CompletableFuture.completedFuture(new ArrayList<>());
|
||||
// Push maxPushed resources...
|
||||
IntStream.range(0, maxPushed)
|
||||
.mapToObj(i -> new PushPromiseFrame(stream.getId(), 0, newRequest("GET", "/push_" + i, new HttpFields())))
|
||||
.map(pushFrame ->
|
||||
{
|
||||
Promise.Completable<Stream> promise = new Promise.Completable<>();
|
||||
stream.push(pushFrame, promise, new Stream.Listener.Adapter());
|
||||
return promise;
|
||||
})
|
||||
// ... wait for the pushed streams...
|
||||
.reduce(result, (cfList, cfStream) -> cfList.thenCombine(cfStream, add),
|
||||
(cfList1, cfList2) -> cfList1.thenCombine(cfList2, addAll))
|
||||
// ... then push one extra stream, the client must reject it...
|
||||
.thenApply(streams ->
|
||||
{
|
||||
PushPromiseFrame extraPushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", "/push_extra", new HttpFields()));
|
||||
FuturePromise<Stream> extraPromise = new FuturePromise<>();
|
||||
stream.push(extraPushFrame, extraPromise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
{
|
||||
Assert.assertEquals(ErrorCode.REFUSED_STREAM_ERROR.code, frame.getError());
|
||||
resetLatch.countDown();
|
||||
}
|
||||
});
|
||||
return streams;
|
||||
})
|
||||
// ... then send the data for the valid pushed streams...
|
||||
.thenAccept(streams -> streams.forEach(pushedStream ->
|
||||
{
|
||||
DataFrame data = new DataFrame(pushedStream.getId(), BufferUtil.EMPTY_BUFFER, true);
|
||||
pushedStream.data(data, Callback.NOOP);
|
||||
}))
|
||||
// ... then send the response.
|
||||
.thenRun(() ->
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
client.setMaxConcurrentPushedStreams(maxPushed);
|
||||
|
||||
Session session = newClient(new Session.Listener.Adapter());
|
||||
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
if (frame.isEndStream())
|
||||
responseLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue