From 6729a6f68907eeaa6e30919e6a1b1341ac04bf31 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Thu, 23 Apr 2020 11:37:02 +0200 Subject: [PATCH] Solve lock contention creating HTTP2 streams (#2188) while fixing race condition that breaks http channels recycling (#4695) Signed-off-by: Ludovic Orban --- .../jetty/http2/client/AbstractTest.java | 9 + .../client/ConcurrentStreamCreationTest.java | 107 +++++++++ .../jetty/http2/client/StreamCloseTest.java | 2 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 223 +++++++++++++----- .../jetty/http2/frames/PushPromiseFrame.java | 5 + .../client/http/HttpConnectionOverHTTP2.java | 2 +- .../client/http/PushedResourcesTest.java | 2 +- .../http2/server/HTTP2ServerConnection.java | 2 +- .../http2/server/HttpTransportOverHTTP2.java | 2 +- 9 files changed, 285 insertions(+), 69 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java index d3cd825ba26..55898830c17 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + import javax.servlet.http.HttpServlet; import org.eclipse.jetty.http.HostPortHttpField; @@ -30,6 +32,7 @@ import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; import org.eclipse.jetty.server.ConnectionFactory; @@ -69,10 +72,16 @@ public class AbstractTest } protected void start(ServerSessionListener listener) throws Exception + { + start(listener, x -> {}); + } + + protected void start(ServerSessionListener listener, Consumer configurator) throws Exception { RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener); connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + configurator.accept(connectionFactory); prepareServer(connectionFactory); server.start(); diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java new file mode 100644 index 00000000000..53de7b13375 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java @@ -0,0 +1,107 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConcurrentStreamCreationTest extends AbstractTest +{ + @Test + public void testConcurrentStreamCreation() throws Exception + { + int threads = 64; + int runs = 1; + int iterations = 1024; + int total = threads * runs * iterations; + CountDownLatch serverLatch = new CountDownLatch(total); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); + stream.headers(responseFrame, Callback.NOOP); + serverLatch.countDown(); + return null; + } + }, h2 -> h2.setMaxConcurrentStreams(total)); + + Session session = newClient(new Session.Listener.Adapter()); + + CyclicBarrier barrier = new CyclicBarrier(threads); + CountDownLatch clientLatch = new CountDownLatch(total); + CountDownLatch responseLatch = new CountDownLatch(runs); + Promise promise = new Promise.Adapter() + { + @Override + public void succeeded(Stream stream) + { + clientLatch.countDown(); + } + }; + IntStream.range(0, threads).forEach(i -> new Thread(() -> + { + try + { + barrier.await(); + IntStream.range(0, runs).forEach(j -> + IntStream.range(0, iterations).forEach(k -> + { + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame requestFrame = new HeadersFrame(request, null, true); + session.newStream(requestFrame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + int status = ((MetaData.Response)frame.getMetaData()).getStatus(); + if (status == HttpStatus.OK_200 && frame.isEndStream()) + responseLatch.countDown(); + } + }); + })); + } + catch (Throwable x) + { + x.printStackTrace(); + } + }).start()); + assertTrue(clientLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on client: %d/%d", clientLatch.getCount(), total)); + assertTrue(serverLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on server: %d/%d", serverLatch.getCount(), total)); + assertTrue(responseLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing response on client: %d/%d", clientLatch.getCount(), total)); + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java index d5dce9df7b5..c2ecfc78bd2 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java @@ -196,7 +196,7 @@ public class StreamCloseTest extends AbstractTest @Override public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", new HttpFields())); + PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), newRequest("GET", new HttpFields())); stream.push(pushFrame, new Promise.Adapter() { @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 2248ed0990e..b7a6aa21635 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -21,10 +21,12 @@ package org.eclipse.jetty.http2; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -74,6 +76,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private static final Logger LOG = Log.getLogger(HTTP2Session.class); private final ConcurrentMap streams = new ConcurrentHashMap<>(); + private final StreamCreator streamCreator = new StreamCreator(); private final AtomicInteger localStreamIds = new AtomicInteger(); private final AtomicInteger lastRemoteStreamId = new AtomicInteger(); private final AtomicInteger localStreamCount = new AtomicInteger(); @@ -516,80 +519,19 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) { - try - { - // Synchronization is necessary to atomically create - // the stream id and enqueue the frame to be sent. - boolean queued; - synchronized (this) - { - int streamId = frame.getStreamId(); - if (streamId <= 0) - { - streamId = localStreamIds.getAndAdd(2); - PriorityFrame priority = frame.getPriority(); - priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), - priority.getWeight(), priority.isExclusive()); - frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); - } - IStream stream = createLocalStream(streamId); - stream.setListener(listener); - - ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)); - queued = flusher.append(entry); - } - // Iterate outside the synchronized block. - if (queued) - flusher.iterate(); - } - catch (Throwable x) - { - promise.failed(x); - } + streamCreator.newStream(frame, promise, listener); } @Override public int priority(PriorityFrame frame, Callback callback) { - int streamId = frame.getStreamId(); - IStream stream = streams.get(streamId); - if (stream == null) - { - streamId = localStreamIds.getAndAdd(2); - frame = new PriorityFrame(streamId, frame.getParentStreamId(), - frame.getWeight(), frame.isExclusive()); - } - control(stream, callback, frame); - return streamId; + return streamCreator.priority(frame, callback); } @Override public void push(IStream stream, Promise promise, PushPromiseFrame frame, Stream.Listener listener) { - try - { - // Synchronization is necessary to atomically create - // the stream id and enqueue the frame to be sent. - boolean queued; - synchronized (this) - { - int streamId = localStreamIds.getAndAdd(2); - frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); - - IStream pushStream = createLocalStream(streamId); - pushStream.setListener(listener); - - ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream)); - queued = flusher.append(entry); - } - // Iterate outside the synchronized block. - if (queued) - flusher.iterate(); - } - catch (Throwable x) - { - promise.failed(x); - } + streamCreator.push(frame, promise, listener); } @Override @@ -1648,4 +1590,157 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio terminate(failure); } } + + /** + * SPEC: It is required that stream ids are monotonically increasing. + * Here we use a queue to atomically create the stream id and + * claim the slot in the queue. Concurrent threads will only + * flush up to the slot with a non-null entry to make sure + * frames are sent strictly in their stream id order. + * See https://tools.ietf.org/html/rfc7540#section-5.1.1. + */ + private class StreamCreator + { + private final Queue slots = new ArrayDeque<>(); + private Thread flushing; + + private int priority(PriorityFrame frame, Callback callback) + { + Slot slot = new Slot(); + int currentStreamId = frame.getStreamId(); + int streamId = reserveSlot(slot, currentStreamId); + + if (currentStreamId <= 0) + frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive()); + + slot.entry = new ControlEntry(frame, null, callback); + flush(); + return streamId; + } + + private void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) + { + Slot slot = new Slot(); + int currentStreamId = frame.getStreamId(); + int streamId = reserveSlot(slot, currentStreamId); + + if (currentStreamId <= 0) + { + PriorityFrame priority = frame.getPriority(); + priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive()); + frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); + } + + try + { + createLocalStream(slot, frame, promise, listener, streamId); + } + catch (Throwable x) + { + freeSlotAndFailPromise(slot, promise, x); + } + } + + private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) + { + Slot slot = new Slot(); + int streamId = reserveSlot(slot, 0); + frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); + + try + { + createLocalStream(slot, frame, promise, listener, streamId); + } + catch (Throwable x) + { + freeSlotAndFailPromise(slot, promise, x); + } + } + + private int reserveSlot(Slot slot, int streamId) + { + if (streamId <= 0) + { + synchronized (this) + { + streamId = localStreamIds.getAndAdd(2); + slots.offer(slot); + } + } + else + { + synchronized (this) + { + slots.offer(slot); + } + } + return streamId; + } + + private void createLocalStream(Slot slot, Frame frame, Promise promise, Stream.Listener listener, int streamId) + { + IStream stream = HTTP2Session.this.createLocalStream(streamId); + stream.setListener(listener); + slot.entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)); + flush(); + } + + private void freeSlotAndFailPromise(Slot slot, Promise promise, Throwable x) + { + synchronized (this) + { + slots.remove(slot); + } + flush(); + promise.failed(x); + } + + /** + * Flush goes over the entries of the slots queue to flush the entries, + * until either one of the following two conditions is true: + * - The queue is empty. + * - It reaches a slot with a null entry. + * When a slot with a null entry is encountered, this means a concurrent thread reserved a slot + * but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering + * the null entry must bail out and it is up to the concurrent thread to finish up flushing. + * Note that only one thread can flush at any one time, if two threads happen to call flush + * concurrently, one will do the work while the other will bail out, so it is safe that all + * threads call flush after they're done reserving a slot and setting the entry. + */ + private void flush() + { + Thread thread = Thread.currentThread(); + boolean queued = false; + while (true) + { + ControlEntry entry; + synchronized (this) + { + if (flushing == null) + flushing = thread; + else if (flushing != thread) + return; // another thread is flushing + + Slot slot = slots.peek(); + entry = slot == null ? null : slot.entry; + + if (entry == null) + { + flushing = null; + break; // No more slots or null entry, so we may iterate on the flusher + } + + slots.poll(); + } + queued |= flusher.append(entry); + } + if (queued) + flusher.iterate(); + } + + private class Slot + { + private volatile ControlEntry entry; + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java index 52553bb0ffc..1099b55fd08 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java @@ -26,6 +26,11 @@ public class PushPromiseFrame extends Frame private final int promisedStreamId; private final MetaData metaData; + public PushPromiseFrame(int streamId, MetaData metaData) + { + this(streamId, 0, metaData); + } + public PushPromiseFrame(int streamId, int promisedStreamId, MetaData metaData) { super(FrameType.PUSH_PROMISE); diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 59b8a4c2b82..c8f99648517 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -51,7 +51,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Session session; - private boolean recycleHttpChannels; + private boolean recycleHttpChannels = true; public HttpConnectionOverHTTP2(HttpDestination destination, Session session) { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java index 1e69393d084..267d112de1c 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java @@ -65,7 +65,7 @@ public class PushedResourcesTest extends AbstractTest { HttpURI pushURI = new HttpURI("http://localhost:" + connector.getLocalPort() + pushPath); MetaData.Request pushRequest = new MetaData.Request(HttpMethod.GET.asString(), pushURI, HttpVersion.HTTP_2, new HttpFields()); - stream.push(new PushPromiseFrame(stream.getId(), 0, pushRequest), new Promise.Adapter() + stream.push(new PushPromiseFrame(stream.getId(), pushRequest), new Promise.Adapter() { @Override public void succeeded(Stream pushStream) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index c8b1a24c7a3..963a453776f 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -93,7 +93,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection private final AtomicLong totalResponses = new AtomicLong(); private final ServerSessionListener listener; private final HttpConfiguration httpConfig; - private boolean recycleHttpChannels; + private boolean recycleHttpChannels = true; public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 330b6475043..6c9cc4343d2 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -255,7 +255,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport if (LOG.isDebugEnabled()) LOG.debug("HTTP/2 Push {}", request); - stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise() + stream.push(new PushPromiseFrame(stream.getId(), request), new Promise() { @Override public void succeeded(Stream pushStream)