diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 865379c68ac..195b836ca07 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -130,6 +130,7 @@ public class HTTP2Client extends ContainerLifeCycle private List protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14"); private int initialSessionRecvWindow = 16 * 1024 * 1024; private int initialStreamRecvWindow = 8 * 1024 * 1024; + private int maxConcurrentPushedStreams = 32; private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); @@ -336,6 +337,17 @@ public class HTTP2Client extends ContainerLifeCycle this.initialStreamRecvWindow = initialStreamRecvWindow; } + @ManagedAttribute("The max number of concurrent pushed streams") + public int getMaxConcurrentPushedStreams() + { + return maxConcurrentPushedStreams; + } + + public void setMaxConcurrentPushedStreams(int maxConcurrentPushedStreams) + { + this.maxConcurrentPushedStreams = maxConcurrentPushedStreams; + } + @ManagedAttribute("The max number of keys in all SETTINGS frames") public int getMaxSettingsKeys() { diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 87aaee0d837..23b7e2256b4 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.http2.client; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; @@ -53,7 +52,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory private final Connection.Listener connectionListener = new ConnectionListener(); @Override - public Connection newConnection(EndPoint endPoint, Map context) throws IOException + public Connection newConnection(EndPoint endPoint, Map context) { HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY); ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY); @@ -66,6 +65,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory Generator generator = new Generator(byteBufferPool); FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy(); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); + session.setMaxRemoteStreams(client.getMaxConcurrentPushedStreams()); Parser parser = new Parser(byteBufferPool, session, 4096, 8192); parser.setMaxSettingsKeys(client.getMaxSettingsKeys()); @@ -111,6 +111,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory if (settings == null) settings = new HashMap<>(); settings.computeIfAbsent(SettingsFrame.INITIAL_WINDOW_SIZE, k -> client.getInitialStreamRecvWindow()); + settings.computeIfAbsent(SettingsFrame.MAX_CONCURRENT_STREAMS, k -> client.getMaxConcurrentPushedStreams()); PrefaceFrame prefaceFrame = new PrefaceFrame(); SettingsFrame settingsFrame = new SettingsFrame(settings, false); diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/MaxPushedStreamsTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/MaxPushedStreamsTest.java new file mode 100644 index 00000000000..be26c7a8955 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/MaxPushedStreamsTest.java @@ -0,0 +1,130 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.stream.IntStream; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PushPromiseFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.junit.Assert; +import org.junit.Test; + +public class MaxPushedStreamsTest extends AbstractTest +{ + @Test + public void testMaxPushedStreams() throws Exception + { + int maxPushed = 2; + + CountDownLatch resetLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + // Trick the server into thinking it can push unlimited streams. + ((HTTP2Session)stream.getSession()).setMaxLocalStreams(-1); + + BiFunction, Stream, List> add = (l, s) -> { l.add(s); return l; }; + BinaryOperator> addAll = (l1, l2) -> { l1.addAll(l2); return l1; }; + CompletableFuture> result = CompletableFuture.completedFuture(new ArrayList<>()); + // Push maxPushed resources... + IntStream.range(0, maxPushed) + .mapToObj(i -> new PushPromiseFrame(stream.getId(), 0, newRequest("GET", "/push_" + i, new HttpFields()))) + .map(pushFrame -> + { + Promise.Completable promise = new Promise.Completable<>(); + stream.push(pushFrame, promise, new Stream.Listener.Adapter()); + return promise; + }) + // ... wait for the pushed streams... + .reduce(result, (cfList, cfStream) -> cfList.thenCombine(cfStream, add), + (cfList1, cfList2) -> cfList1.thenCombine(cfList2, addAll)) + // ... then push one extra stream, the client must reject it... + .thenApply(streams -> + { + PushPromiseFrame extraPushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", "/push_extra", new HttpFields())); + FuturePromise extraPromise = new FuturePromise<>(); + stream.push(extraPushFrame, extraPromise, new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + Assert.assertEquals(ErrorCode.REFUSED_STREAM_ERROR.code, frame.getError()); + resetLatch.countDown(); + } + }); + return streams; + }) + // ... then send the data for the valid pushed streams... + .thenAccept(streams -> streams.forEach(pushedStream -> + { + DataFrame data = new DataFrame(pushedStream.getId(), BufferUtil.EMPTY_BUFFER, true); + pushedStream.data(data, Callback.NOOP); + })) + // ... then send the response. + .thenRun(() -> + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + }); + return null; + } + }); + client.setMaxConcurrentPushedStreams(maxPushed); + + Session session = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + CountDownLatch responseLatch = new CountDownLatch(1); + session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + responseLatch.countDown(); + } + }); + + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + } +}