From 48d68a49164041d2ece903735dca3c5493091ff5 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 5 Aug 2014 09:21:27 +1000 Subject: [PATCH 1/2] handle http/1 host header in http/2 --- .../java/org/eclipse/jetty/http2/hpack/MetaDataBuilder.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/jetty-http2/http2-hpack/src/main/java/org/eclipse/jetty/http2/hpack/MetaDataBuilder.java b/jetty-http2/http2-hpack/src/main/java/org/eclipse/jetty/http2/hpack/MetaDataBuilder.java index ac045a92d0d..2f12502695d 100644 --- a/jetty-http2/http2-hpack/src/main/java/org/eclipse/jetty/http2/hpack/MetaDataBuilder.java +++ b/jetty-http2/http2-hpack/src/main/java/org/eclipse/jetty/http2/hpack/MetaDataBuilder.java @@ -113,6 +113,12 @@ public class MetaDataBuilder _authority=(field instanceof HostPortHttpField)?((HostPortHttpField)field):new AuthorityHttpField(field.getValue()); break; + case HOST: + if (_authority==null) + _authority=(field instanceof HostPortHttpField)?((HostPortHttpField)field):new AuthorityHttpField(field.getValue()); + _fields.add(field); + break; + case C_PATH: _path = field.getValue(); break; From 636c7eaeae076b58d18afe691df8639acd187b41 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 5 Aug 2014 01:44:34 +0200 Subject: [PATCH 2/2] Fixed handling of max concurrent streams. There is a difference between the value set via configuration, that always refer to remote streams (streams initiated by remote peers), and the value received via SETTINGS frame, that always refer to local streams (streams initiated locally). --- .../jetty/http2/client/StreamCountTest.java | 188 ++++++++++++++++++ .../org/eclipse/jetty/http2/HTTP2Session.java | 58 ++++-- 2 files changed, 230 insertions(+), 16 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java new file mode 100644 index 00000000000..286687b2873 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java @@ -0,0 +1,188 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.junit.Assert; +import org.junit.Test; + +public class StreamCountTest extends AbstractTest +{ + @Test + public void testServersAllowsOneStreamEnforcedByClient() throws Exception + { + startServer(new ServerSessionListener.Adapter() + { + @Override + public Map onPreface(Session session) + { + Map settings = new HashMap<>(); + settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 1); + return settings; + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (frame.isEndStream()) + { + HttpFields fields = new HttpFields(); + MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields); + stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback); + } + } + }; + } + }); + + final CountDownLatch settingsLatch = new CountDownLatch(1); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + }); + + Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + HttpFields fields = new HttpFields(); + MetaData.Request metaData = newRequest("GET", fields); + HeadersFrame frame1 = new HeadersFrame(1, metaData, null, false); + FuturePromise streamPromise1 = new FuturePromise<>(); + final CountDownLatch responseLatch = new CountDownLatch(1); + session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + responseLatch.countDown(); + } + }); + Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS); + + HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false); + FuturePromise streamPromise2 = new FuturePromise<>(); + session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter()); + + try + { + streamPromise2.get(5, TimeUnit.SECONDS); + Assert.fail(); + } + catch (ExecutionException x) + { + // Expected + } + + stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback.Adapter()); + Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testServersAllowsOneStreamEnforcedByServer() throws Exception + { + final CountDownLatch resetLatch = new CountDownLatch(1); + startServer(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + HTTP2Session session = (HTTP2Session)stream.getSession(); + session.setMaxRemoteStreams(1); + + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (frame.isEndStream()) + { + HttpFields fields = new HttpFields(); + MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields); + stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback); + } + } + }; + } + }); + + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onReset(Session session, ResetFrame frame) + { + resetLatch.countDown(); + } + }); + + HttpFields fields = new HttpFields(); + MetaData.Request metaData = newRequest("GET", fields); + HeadersFrame frame1 = new HeadersFrame(1, metaData, null, false); + FuturePromise streamPromise1 = new FuturePromise<>(); + final CountDownLatch responseLatch = new CountDownLatch(1); + session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + responseLatch.countDown(); + } + }); + + Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS); + + HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false); + FuturePromise streamPromise2 = new FuturePromise<>(); + session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter()); + + streamPromise2.get(5, TimeUnit.SECONDS); + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + + stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback.Adapter()); + Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 79c89a55a4c..139d7afa860 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -77,7 +77,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener private final ConcurrentMap streams = new ConcurrentHashMap<>(); private final AtomicInteger streamIds = new AtomicInteger(); private final AtomicInteger lastStreamId = new AtomicInteger(); - private final AtomicInteger streamCount = new AtomicInteger(); + private final AtomicInteger localStreamCount = new AtomicInteger(); + private final AtomicInteger remoteStreamCount = new AtomicInteger(); private final AtomicInteger windowSize = new AtomicInteger(); private final AtomicBoolean closed = new AtomicBoolean(); private final Scheduler scheduler; @@ -86,7 +87,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener private final Listener listener; private final FlowControl flowControl; private final Flusher flusher; - private volatile int maxStreamCount; + private int maxLocalStreams; + private int maxRemoteStreams; public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId) { @@ -96,11 +98,22 @@ public abstract class HTTP2Session implements ISession, Parser.Listener this.listener = listener; this.flowControl = flowControl; this.flusher = new Flusher(4); - this.maxStreamCount = maxStreams; + this.maxLocalStreams = maxStreams; + this.maxRemoteStreams = maxStreams; this.streamIds.set(initialStreamId); this.windowSize.set(flowControl.getInitialWindowSize()); } + public int getMaxRemoteStreams() + { + return maxRemoteStreams; + } + + public void setMaxRemoteStreams(int maxRemoteStreams) + { + this.maxRemoteStreams = maxRemoteStreams; + } + public Generator getGenerator() { return generator; @@ -185,9 +198,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS)) { - maxStreamCount = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS); + maxLocalStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS); if (LOG.isDebugEnabled()) - LOG.debug("Updated max concurrent streams to {}", maxStreamCount); + LOG.debug("Updated max local concurrent streams to {}", maxLocalStreams); } if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE)) { @@ -295,7 +308,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } @Override - public void newStream(HeadersFrame frame, final Promise promise, Stream.Listener listener) + public void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) { // Synchronization is necessary to atomically create // the stream id and enqueue the frame to be sent. @@ -306,12 +319,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(), priority.getWeight(), priority.isExclusive()); frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); - final IStream stream = createLocalStream(frame); + final IStream stream = createLocalStream(frame, promise); if (stream == null) - { - promise.failed(new IllegalStateException()); return; - } stream.updateClose(frame.isEndStream(), true); stream.setListener(listener); @@ -385,8 +395,21 @@ public abstract class HTTP2Session implements ISession, Parser.Listener flusher.iterate(); } - protected IStream createLocalStream(HeadersFrame frame) + protected IStream createLocalStream(HeadersFrame frame, Promise promise) { + while (true) + { + int localCount = localStreamCount.get(); + int maxCount = maxLocalStreams; + if (maxCount >= 0 && localCount >= maxCount) + { + promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); + return null; + } + if (localStreamCount.compareAndSet(localCount, localCount + 1)) + break; + } + IStream stream = newStream(frame); int streamId = stream.getId(); if (streams.putIfAbsent(streamId, stream) == null) @@ -399,6 +422,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } else { + promise.failed(new IllegalStateException("Duplicate stream " + streamId)); return null; } } @@ -410,14 +434,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener // SPEC: exceeding max concurrent streams is treated as stream error. while (true) { - int currentStreams = streamCount.get(); - int maxStreams = maxStreamCount; - if (maxStreams >= 0 && currentStreams >= maxStreams) + int remoteCount = remoteStreamCount.get(); + int maxCount = getMaxRemoteStreams(); + if (maxCount >= 0 && remoteCount >= maxCount) { reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), disconnectOnFailure()); return null; } - if (streamCount.compareAndSet(currentStreams, currentStreams + 1)) + if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1)) break; } @@ -453,7 +477,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener assert removed == stream; if (local) - streamCount.decrementAndGet(); + localStreamCount.decrementAndGet(); + else + remoteStreamCount.decrementAndGet(); if (LOG.isDebugEnabled()) LOG.debug("Removed {}", stream);