diff --git a/jetty-hazelcast/pom.xml b/jetty-hazelcast/pom.xml
index a52e26a1eb1..fb337b111a7 100644
--- a/jetty-hazelcast/pom.xml
+++ b/jetty-hazelcast/pom.xml
@@ -59,18 +59,6 @@
${project.version}
test
-
- org.eclipse.jetty.websocket
- websocket-servlet
- ${project.version}
- test
-
-
- org.eclipse.jetty.websocket
- websocket-jetty-server
- ${project.version}
- test
-
org.eclipse.jetty.toolchain
jetty-test-helper
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java
index c64edc2b89e..be0b0dbd74b 100644
--- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java
@@ -20,8 +20,10 @@ package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import jakarta.servlet.http.HttpServlet;
+
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
@@ -30,6 +32,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
+import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
@@ -69,10 +72,16 @@ public class AbstractTest
}
protected void start(ServerSessionListener listener) throws Exception
+ {
+ start(listener, x -> {});
+ }
+
+ protected void start(ServerSessionListener listener, Consumer configurator) throws Exception
{
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
+ configurator.accept(connectionFactory);
prepareServer(connectionFactory);
server.start();
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java
new file mode 100644
index 00000000000..2b82eda8b27
--- /dev/null
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/ConcurrentStreamCreationTest.java
@@ -0,0 +1,107 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under
+// the terms of the Eclipse Public License 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0
+//
+// This Source Code may also be made available under the following
+// Secondary Licenses when the conditions for such availability set
+// forth in the Eclipse Public License, v. 2.0 are satisfied:
+// the Apache License v2.0 which is available at
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.http2.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.api.server.ServerSessionListener;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConcurrentStreamCreationTest extends AbstractTest
+{
+ @Test
+ public void testConcurrentStreamCreation() throws Exception
+ {
+ int threads = 64;
+ int runs = 1;
+ int iterations = 1024;
+ int total = threads * runs * iterations;
+ CountDownLatch serverLatch = new CountDownLatch(total);
+ start(new ServerSessionListener.Adapter()
+ {
+ @Override
+ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
+ {
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
+ HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
+ stream.headers(responseFrame, Callback.NOOP);
+ serverLatch.countDown();
+ return null;
+ }
+ }, h2 -> h2.setMaxConcurrentStreams(total));
+
+ Session session = newClient(new Session.Listener.Adapter());
+
+ CyclicBarrier barrier = new CyclicBarrier(threads);
+ CountDownLatch clientLatch = new CountDownLatch(total);
+ CountDownLatch responseLatch = new CountDownLatch(runs);
+ Promise promise = new Promise.Adapter()
+ {
+ @Override
+ public void succeeded(Stream stream)
+ {
+ clientLatch.countDown();
+ }
+ };
+ IntStream.range(0, threads).forEach(i -> new Thread(() ->
+ {
+ try
+ {
+ barrier.await();
+ IntStream.range(0, runs).forEach(j ->
+ IntStream.range(0, iterations).forEach(k ->
+ {
+ MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
+ HeadersFrame requestFrame = new HeadersFrame(request, null, true);
+ session.newStream(requestFrame, promise, new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onHeaders(Stream stream, HeadersFrame frame)
+ {
+ int status = ((MetaData.Response)frame.getMetaData()).getStatus();
+ if (status == HttpStatus.OK_200 && frame.isEndStream())
+ responseLatch.countDown();
+ }
+ });
+ }));
+ }
+ catch (Throwable x)
+ {
+ x.printStackTrace();
+ }
+ }).start());
+ assertTrue(clientLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on client: %d/%d", clientLatch.getCount(), total));
+ assertTrue(serverLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on server: %d/%d", serverLatch.getCount(), total));
+ assertTrue(responseLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing response on client: %d/%d", clientLatch.getCount(), total));
+ }
+}
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java
index 1b0423db1f3..af90b1f7bd6 100644
--- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java
@@ -196,7 +196,7 @@ public class StreamCloseTest extends AbstractTest
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
- PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", HttpFields.EMPTY));
+ PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), newRequest("GET", HttpFields.EMPTY));
stream.push(pushFrame, new Promise.Adapter()
{
@Override
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 6166aba962d..526244a97f0 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -22,10 +22,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -78,6 +80,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Session.class);
private final ConcurrentMap streams = new ConcurrentHashMap<>();
+ private final StreamCreator streamCreator = new StreamCreator();
private final AtomicBiInteger streamCount = new AtomicBiInteger(); // Hi = closed, Lo = stream count
private final AtomicInteger localStreamIds = new AtomicInteger();
private final AtomicInteger lastRemoteStreamId = new AtomicInteger();
@@ -532,6 +535,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener)
{
+ streamCreator.newStream(frame, promise, listener);
+/*
try
{
// Synchronization is necessary to atomically create
@@ -555,6 +560,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
promise.failed(x);
}
+*/
}
/**
@@ -569,18 +575,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
*/
public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut)
{
- HeadersFrame frame = frameIn;
int streamId = frameIn.getStreamId();
if (streamId <= 0)
- {
streamId = localStreamIds.getAndAdd(2);
- PriorityFrame priority = frameIn.getPriority();
- priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
- priority.getWeight(), priority.isExclusive());
- frame = new HeadersFrame(streamId, frameIn.getMetaData(), priority, frameIn.isEndStream());
- }
+
+ HeadersFrame frame = streamCreator.prepareHeadersFrame(streamId, frameIn);
if (frameOut != null)
frameOut[0] = frame;
+
return createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
}
@@ -592,45 +594,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public int priority(PriorityFrame frame, Callback callback)
{
- int streamId = frame.getStreamId();
- IStream stream = streams.get(streamId);
- if (stream == null)
- {
- streamId = localStreamIds.getAndAdd(2);
- frame = new PriorityFrame(streamId, frame.getParentStreamId(),
- frame.getWeight(), frame.isExclusive());
- }
- control(stream, callback, frame);
- return streamId;
+ return streamCreator.priority(frame, callback);
}
@Override
public void push(IStream stream, Promise promise, PushPromiseFrame frame, Stream.Listener listener)
{
- try
- {
- // Synchronization is necessary to atomically create
- // the stream id and enqueue the frame to be sent.
- boolean queued;
- synchronized (this)
- {
- int streamId = localStreamIds.getAndAdd(2);
- frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
-
- IStream pushStream = createLocalStream(streamId, frame.getMetaData());
- pushStream.setListener(listener);
-
- ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
- queued = flusher.append(entry);
- }
- // Iterate outside the synchronized block.
- if (queued)
- flusher.iterate();
- }
- catch (Throwable x)
- {
- promise.failed(x);
- }
+ streamCreator.push(frame, promise, listener);
}
@Override
@@ -1731,4 +1701,167 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
terminate(failure);
}
}
+
+ /**
+ * SPEC: It is required that stream ids are monotonically increasing.
+ * Here we use a queue to atomically create the stream id and
+ * claim the slot in the queue. Concurrent threads will only
+ * flush up to the slot with a non-null entry to make sure
+ * frames are sent strictly in their stream id order.
+ * See https://tools.ietf.org/html/rfc7540#section-5.1.1.
+ */
+ private class StreamCreator
+ {
+ private final Queue slots = new ArrayDeque<>();
+ private Thread flushing;
+
+ private int priority(PriorityFrame frame, Callback callback)
+ {
+ Slot slot = new Slot();
+ int currentStreamId = frame.getStreamId();
+ int streamId = reserveSlot(slot, currentStreamId);
+
+ if (currentStreamId <= 0)
+ frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive());
+
+ assignSlotAndFlush(slot, new ControlEntry(frame, null, callback));
+ return streamId;
+ }
+
+ private void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener)
+ {
+ Slot slot = new Slot();
+ int currentStreamId = frame.getStreamId();
+ int streamId = reserveSlot(slot, currentStreamId);
+
+ frame = prepareHeadersFrame(streamId, frame);
+
+ try
+ {
+ IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
+ stream.setListener(listener);
+ stream.process(new PrefaceFrame(), Callback.NOOP);
+ assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
+ }
+ catch (Throwable x)
+ {
+ releaseSlotFlushAndFail(slot, promise, x);
+ }
+ }
+
+ private HeadersFrame prepareHeadersFrame(int streamId, HeadersFrame frame)
+ {
+ if (frame.getStreamId() <= 0)
+ {
+ PriorityFrame priority = frame.getPriority();
+ priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive());
+ frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
+ }
+ return frame;
+ }
+
+ private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener)
+ {
+ Slot slot = new Slot();
+ int streamId = reserveSlot(slot, 0);
+ frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
+
+ try
+ {
+ IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData());
+ stream.setListener(listener);
+ assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
+ }
+ catch (Throwable x)
+ {
+ releaseSlotFlushAndFail(slot, promise, x);
+ }
+ }
+
+ private void assignSlotAndFlush(Slot slot, ControlEntry entry)
+ {
+ // Every time a slot entry is assigned, we must flush.
+ slot.entry = entry;
+ flush();
+ }
+
+ private int reserveSlot(Slot slot, int streamId)
+ {
+ if (streamId <= 0)
+ {
+ synchronized (this)
+ {
+ streamId = localStreamIds.getAndAdd(2);
+ slots.offer(slot);
+ }
+ }
+ else
+ {
+ synchronized (this)
+ {
+ slots.offer(slot);
+ }
+ }
+ return streamId;
+ }
+
+ private void releaseSlotFlushAndFail(Slot slot, Promise promise, Throwable x)
+ {
+ synchronized (this)
+ {
+ slots.remove(slot);
+ }
+ flush();
+ promise.failed(x);
+ }
+
+ /**
+ * Flush goes over the entries of the slots queue to flush the entries,
+ * until either one of the following two conditions is true:
+ * - The queue is empty.
+ * - It reaches a slot with a null entry.
+ * When a slot with a null entry is encountered, this means a concurrent thread reserved a slot
+ * but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering
+ * the null entry must bail out and it is up to the concurrent thread to finish up flushing.
+ * Note that only one thread can flush at any one time, if two threads happen to call flush
+ * concurrently, one will do the work while the other will bail out, so it is safe that all
+ * threads call flush after they're done reserving a slot and setting the entry.
+ */
+ private void flush()
+ {
+ Thread thread = Thread.currentThread();
+ boolean queued = false;
+ while (true)
+ {
+ ControlEntry entry;
+ synchronized (this)
+ {
+ if (flushing == null)
+ flushing = thread;
+ else if (flushing != thread)
+ return; // Another thread is flushing.
+
+ Slot slot = slots.peek();
+ entry = slot == null ? null : slot.entry;
+
+ if (entry == null)
+ {
+ flushing = null;
+ // No more slots or null entry, so we may iterate on the flusher.
+ break;
+ }
+
+ slots.poll();
+ }
+ queued |= flusher.append(entry);
+ }
+ if (queued)
+ flusher.iterate();
+ }
+
+ private class Slot
+ {
+ private volatile ControlEntry entry;
+ }
+ }
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java
index 4d7d9f2e6cd..ee19dffeb89 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java
@@ -26,6 +26,11 @@ public class PushPromiseFrame extends Frame
private final int promisedStreamId;
private final MetaData.Request metaData;
+ public PushPromiseFrame(int streamId, MetaData.Request metaData)
+ {
+ this(streamId, 0, metaData);
+ }
+
public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData)
{
super(FrameType.PUSH_PROMISE);
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
index 3fec7112c12..f8f3d5c6e0a 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
@@ -61,7 +61,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
- private boolean recycleHttpChannels;
+ private boolean recycleHttpChannels = true;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java
index ea2d38b8d64..b80ecddf5cd 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/PushedResourcesTest.java
@@ -64,7 +64,7 @@ public class PushedResourcesTest extends AbstractTest
{
HttpURI pushURI = HttpURI.from("http://localhost:" + connector.getLocalPort() + pushPath);
MetaData.Request pushRequest = new MetaData.Request(HttpMethod.GET.asString(), pushURI, HttpVersion.HTTP_2, HttpFields.EMPTY);
- stream.push(new PushPromiseFrame(stream.getId(), 0, pushRequest), new Promise.Adapter<>()
+ stream.push(new PushPromiseFrame(stream.getId(), pushRequest), new Promise.Adapter<>()
{
@Override
public void succeeded(Stream pushStream)
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
index 6c0de97c0f4..25ca2002064 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
@@ -94,7 +94,7 @@ public class HTTP2ServerConnection extends HTTP2Connection
private final AtomicLong totalResponses = new AtomicLong();
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
- private boolean recycleHttpChannels;
+ private boolean recycleHttpChannels = true;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java
index fe7e62dfae8..4e2f6367cbf 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java
@@ -272,7 +272,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}", request);
- stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<>()
+ stream.push(new PushPromiseFrame(stream.getId(), request), new Promise<>()
{
@Override
public void succeeded(Stream pushStream)