diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java index cf8d3ddc135..f9b2f3d987d 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + import javax.servlet.http.HttpServlet; import org.eclipse.jetty.http.HostPortHttpField; @@ -30,6 +32,7 @@ import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; import org.eclipse.jetty.server.ConnectionFactory; @@ -69,10 +72,16 @@ public class AbstractTest } protected void start(ServerSessionListener listener) throws Exception + { + start(listener, x -> {}); + } + + protected void start(ServerSessionListener listener, Consumer configurator) throws Exception { RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener); connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + configurator.accept(connectionFactory); prepareServer(connectionFactory); server.start(); diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java new file mode 100644 index 00000000000..2b82eda8b27 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java @@ -0,0 +1,107 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConcurrentStreamCreationTest extends AbstractTest +{ + @Test + public void testConcurrentStreamCreation() throws Exception + { + int threads = 64; + int runs = 1; + int iterations = 1024; + int total = threads * runs * iterations; + CountDownLatch serverLatch = new CountDownLatch(total); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); + stream.headers(responseFrame, Callback.NOOP); + serverLatch.countDown(); + return null; + } + }, h2 -> h2.setMaxConcurrentStreams(total)); + + Session session = newClient(new Session.Listener.Adapter()); + + CyclicBarrier barrier = new CyclicBarrier(threads); + CountDownLatch clientLatch = new CountDownLatch(total); + CountDownLatch responseLatch = new CountDownLatch(runs); + Promise promise = new Promise.Adapter() + { + @Override + public void succeeded(Stream stream) + { + clientLatch.countDown(); + } + }; + IntStream.range(0, threads).forEach(i -> new Thread(() -> + { + try + { + barrier.await(); + IntStream.range(0, runs).forEach(j -> + IntStream.range(0, iterations).forEach(k -> + { + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + HeadersFrame requestFrame = new HeadersFrame(request, null, true); + session.newStream(requestFrame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + int status = ((MetaData.Response)frame.getMetaData()).getStatus(); + if (status == HttpStatus.OK_200 && frame.isEndStream()) + responseLatch.countDown(); + } + }); + })); + } + catch (Throwable x) + { + x.printStackTrace(); + } + }).start()); + assertTrue(clientLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on client: %d/%d", clientLatch.getCount(), total)); + assertTrue(serverLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on server: %d/%d", serverLatch.getCount(), total)); + assertTrue(responseLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing response on client: %d/%d", clientLatch.getCount(), total)); + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java index 1b0423db1f3..af90b1f7bd6 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java @@ -196,7 +196,7 @@ public class StreamCloseTest extends AbstractTest @Override public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", HttpFields.EMPTY)); + PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), newRequest("GET", HttpFields.EMPTY)); stream.push(pushFrame, new Promise.Adapter() { @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 6166aba962d..526244a97f0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -22,10 +22,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -78,6 +80,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private static final Logger LOG = LoggerFactory.getLogger(HTTP2Session.class); private final ConcurrentMap streams = new ConcurrentHashMap<>(); + private final StreamCreator streamCreator = new StreamCreator(); private final AtomicBiInteger streamCount = new AtomicBiInteger(); // Hi = closed, Lo = stream count private final AtomicInteger localStreamIds = new AtomicInteger(); private final AtomicInteger lastRemoteStreamId = new AtomicInteger(); @@ -532,6 +535,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) { + streamCreator.newStream(frame, promise, listener); +/* try { // Synchronization is necessary to atomically create @@ -555,6 +560,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { promise.failed(x); } +*/ } /** @@ -569,18 +575,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio */ public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut) { - HeadersFrame frame = frameIn; int streamId = frameIn.getStreamId(); if (streamId <= 0) - { streamId = localStreamIds.getAndAdd(2); - PriorityFrame priority = frameIn.getPriority(); - priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), - priority.getWeight(), priority.isExclusive()); - frame = new HeadersFrame(streamId, frameIn.getMetaData(), priority, frameIn.isEndStream()); - } + + HeadersFrame frame = streamCreator.prepareHeadersFrame(streamId, frameIn); if (frameOut != null) frameOut[0] = frame; + return createLocalStream(streamId, (MetaData.Request)frame.getMetaData()); } @@ -592,45 +594,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public int priority(PriorityFrame frame, Callback callback) { - int streamId = frame.getStreamId(); - IStream stream = streams.get(streamId); - if (stream == null) - { - streamId = localStreamIds.getAndAdd(2); - frame = new PriorityFrame(streamId, frame.getParentStreamId(), - frame.getWeight(), frame.isExclusive()); - } - control(stream, callback, frame); - return streamId; + return streamCreator.priority(frame, callback); } @Override public void push(IStream stream, Promise promise, PushPromiseFrame frame, Stream.Listener listener) { - try - { - // Synchronization is necessary to atomically create - // the stream id and enqueue the frame to be sent. - boolean queued; - synchronized (this) - { - int streamId = localStreamIds.getAndAdd(2); - frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); - - IStream pushStream = createLocalStream(streamId, frame.getMetaData()); - pushStream.setListener(listener); - - ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream)); - queued = flusher.append(entry); - } - // Iterate outside the synchronized block. - if (queued) - flusher.iterate(); - } - catch (Throwable x) - { - promise.failed(x); - } + streamCreator.push(frame, promise, listener); } @Override @@ -1731,4 +1701,167 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio terminate(failure); } } + + /** + * SPEC: It is required that stream ids are monotonically increasing. + * Here we use a queue to atomically create the stream id and + * claim the slot in the queue. Concurrent threads will only + * flush up to the slot with a non-null entry to make sure + * frames are sent strictly in their stream id order. + * See https://tools.ietf.org/html/rfc7540#section-5.1.1. + */ + private class StreamCreator + { + private final Queue slots = new ArrayDeque<>(); + private Thread flushing; + + private int priority(PriorityFrame frame, Callback callback) + { + Slot slot = new Slot(); + int currentStreamId = frame.getStreamId(); + int streamId = reserveSlot(slot, currentStreamId); + + if (currentStreamId <= 0) + frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive()); + + assignSlotAndFlush(slot, new ControlEntry(frame, null, callback)); + return streamId; + } + + private void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) + { + Slot slot = new Slot(); + int currentStreamId = frame.getStreamId(); + int streamId = reserveSlot(slot, currentStreamId); + + frame = prepareHeadersFrame(streamId, frame); + + try + { + IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData()); + stream.setListener(listener); + stream.process(new PrefaceFrame(), Callback.NOOP); + assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream))); + } + catch (Throwable x) + { + releaseSlotFlushAndFail(slot, promise, x); + } + } + + private HeadersFrame prepareHeadersFrame(int streamId, HeadersFrame frame) + { + if (frame.getStreamId() <= 0) + { + PriorityFrame priority = frame.getPriority(); + priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive()); + frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); + } + return frame; + } + + private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) + { + Slot slot = new Slot(); + int streamId = reserveSlot(slot, 0); + frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); + + try + { + IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData()); + stream.setListener(listener); + assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream))); + } + catch (Throwable x) + { + releaseSlotFlushAndFail(slot, promise, x); + } + } + + private void assignSlotAndFlush(Slot slot, ControlEntry entry) + { + // Every time a slot entry is assigned, we must flush. + slot.entry = entry; + flush(); + } + + private int reserveSlot(Slot slot, int streamId) + { + if (streamId <= 0) + { + synchronized (this) + { + streamId = localStreamIds.getAndAdd(2); + slots.offer(slot); + } + } + else + { + synchronized (this) + { + slots.offer(slot); + } + } + return streamId; + } + + private void releaseSlotFlushAndFail(Slot slot, Promise promise, Throwable x) + { + synchronized (this) + { + slots.remove(slot); + } + flush(); + promise.failed(x); + } + + /** + * Flush goes over the entries of the slots queue to flush the entries, + * until either one of the following two conditions is true: + * - The queue is empty. + * - It reaches a slot with a null entry. + * When a slot with a null entry is encountered, this means a concurrent thread reserved a slot + * but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering + * the null entry must bail out and it is up to the concurrent thread to finish up flushing. + * Note that only one thread can flush at any one time, if two threads happen to call flush + * concurrently, one will do the work while the other will bail out, so it is safe that all + * threads call flush after they're done reserving a slot and setting the entry. + */ + private void flush() + { + Thread thread = Thread.currentThread(); + boolean queued = false; + while (true) + { + ControlEntry entry; + synchronized (this) + { + if (flushing == null) + flushing = thread; + else if (flushing != thread) + return; // Another thread is flushing. + + Slot slot = slots.peek(); + entry = slot == null ? null : slot.entry; + + if (entry == null) + { + flushing = null; + // No more slots or null entry, so we may iterate on the flusher. + break; + } + + slots.poll(); + } + queued |= flusher.append(entry); + } + if (queued) + flusher.iterate(); + } + + private class Slot + { + private volatile ControlEntry entry; + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java index 4d7d9f2e6cd..ee19dffeb89 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java @@ -26,6 +26,11 @@ public class PushPromiseFrame extends Frame private final int promisedStreamId; private final MetaData.Request metaData; + public PushPromiseFrame(int streamId, MetaData.Request metaData) + { + this(streamId, 0, metaData); + } + public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData) { super(FrameType.PUSH_PROMISE); diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 3fec7112c12..f8f3d5c6e0a 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -61,7 +61,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Session session; - private boolean recycleHttpChannels; + private boolean recycleHttpChannels = true; public HttpConnectionOverHTTP2(HttpDestination destination, Session session) { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java index d5bd31e2726..e37c1ec0b6d 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java @@ -64,7 +64,7 @@ public class PushedResourcesTest extends AbstractTest { HttpURI pushURI = HttpURI.from("http://localhost:" + connector.getLocalPort() + pushPath); MetaData.Request pushRequest = new MetaData.Request(HttpMethod.GET.asString(), pushURI, HttpVersion.HTTP_2, HttpFields.EMPTY); - stream.push(new PushPromiseFrame(stream.getId(), 0, pushRequest), new Promise.Adapter<>() + stream.push(new PushPromiseFrame(stream.getId(), pushRequest), new Promise.Adapter<>() { @Override public void succeeded(Stream pushStream) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index 6c0de97c0f4..25ca2002064 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -94,7 +94,7 @@ public class HTTP2ServerConnection extends HTTP2Connection private final AtomicLong totalResponses = new AtomicLong(); private final ServerSessionListener listener; private final HttpConfiguration httpConfig; - private boolean recycleHttpChannels; + private boolean recycleHttpChannels = true; public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index fe7e62dfae8..4e2f6367cbf 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -272,7 +272,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport if (LOG.isDebugEnabled()) LOG.debug("HTTP/2 Push {}", request); - stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<>() + stream.push(new PushPromiseFrame(stream.getId(), request), new Promise<>() { @Override public void succeeded(Stream pushStream)