Merge pull request #4803 from lorban/jetty-9.4.x-channel-recycling-h2

Solve lock contention creating HTTP2 streams (#2188) …
This commit is contained in:
Simone Bordet 2020-04-29 15:18:50 +02:00 committed by GitHub
commit ac97bc3b31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 285 additions and 69 deletions

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HostPortHttpField;
@ -30,6 +32,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
@ -69,10 +72,16 @@ public class AbstractTest
}
protected void start(ServerSessionListener listener) throws Exception
{
start(listener, x -> {});
}
protected void start(ServerSessionListener listener, Consumer<AbstractHTTP2ServerConnectionFactory> configurator) throws Exception
{
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
configurator.accept(connectionFactory);
prepareServer(connectionFactory);
server.start();

View File

@ -0,0 +1,107 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConcurrentStreamCreationTest extends AbstractTest
{
@Test
public void testConcurrentStreamCreation() throws Exception
{
int threads = 64;
int runs = 1;
int iterations = 1024;
int total = threads * runs * iterations;
CountDownLatch serverLatch = new CountDownLatch(total);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.NOOP);
serverLatch.countDown();
return null;
}
}, h2 -> h2.setMaxConcurrentStreams(total));
Session session = newClient(new Session.Listener.Adapter());
CyclicBarrier barrier = new CyclicBarrier(threads);
CountDownLatch clientLatch = new CountDownLatch(total);
CountDownLatch responseLatch = new CountDownLatch(runs);
Promise<Stream> promise = new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
clientLatch.countDown();
}
};
IntStream.range(0, threads).forEach(i -> new Thread(() ->
{
try
{
barrier.await();
IntStream.range(0, runs).forEach(j ->
IntStream.range(0, iterations).forEach(k ->
{
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(request, null, true);
session.newStream(requestFrame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
int status = ((MetaData.Response)frame.getMetaData()).getStatus();
if (status == HttpStatus.OK_200 && frame.isEndStream())
responseLatch.countDown();
}
});
}));
}
catch (Throwable x)
{
x.printStackTrace();
}
}).start());
assertTrue(clientLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on client: %d/%d", clientLatch.getCount(), total));
assertTrue(serverLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on server: %d/%d", serverLatch.getCount(), total));
assertTrue(responseLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing response on client: %d/%d", clientLatch.getCount(), total));
}
}

View File

@ -196,7 +196,7 @@ public class StreamCloseTest extends AbstractTest
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", new HttpFields()));
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), newRequest("GET", new HttpFields()));
stream.push(pushFrame, new Promise.Adapter<Stream>()
{
@Override

View File

@ -21,10 +21,12 @@ package org.eclipse.jetty.http2;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -74,6 +76,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private static final Logger LOG = Log.getLogger(HTTP2Session.class);
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final StreamCreator streamCreator = new StreamCreator();
private final AtomicInteger localStreamIds = new AtomicInteger();
private final AtomicInteger lastRemoteStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
@ -516,80 +519,19 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
try
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{
int streamId = frame.getStreamId();
if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2);
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
}
IStream stream = createLocalStream(streamId);
stream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
}
catch (Throwable x)
{
promise.failed(x);
}
streamCreator.newStream(frame, promise, listener);
}
@Override
public int priority(PriorityFrame frame, Callback callback)
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream == null)
{
streamId = localStreamIds.getAndAdd(2);
frame = new PriorityFrame(streamId, frame.getParentStreamId(),
frame.getWeight(), frame.isExclusive());
}
control(stream, callback, frame);
return streamId;
return streamCreator.priority(frame, callback);
}
@Override
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{
try
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{
int streamId = localStreamIds.getAndAdd(2);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
IStream pushStream = createLocalStream(streamId);
pushStream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
}
catch (Throwable x)
{
promise.failed(x);
}
streamCreator.push(frame, promise, listener);
}
@Override
@ -1648,4 +1590,157 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
terminate(failure);
}
}
/**
* SPEC: It is required that stream ids are monotonically increasing.
* Here we use a queue to atomically create the stream id and
* claim the slot in the queue. Concurrent threads will only
* flush up to the slot with a non-null entry to make sure
* frames are sent strictly in their stream id order.
* See https://tools.ietf.org/html/rfc7540#section-5.1.1.
*/
private class StreamCreator
{
private final Queue<Slot> slots = new ArrayDeque<>();
private Thread flushing;
private int priority(PriorityFrame frame, Callback callback)
{
Slot slot = new Slot();
int currentStreamId = frame.getStreamId();
int streamId = reserveSlot(slot, currentStreamId);
if (currentStreamId <= 0)
frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive());
slot.entry = new ControlEntry(frame, null, callback);
flush();
return streamId;
}
private void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int currentStreamId = frame.getStreamId();
int streamId = reserveSlot(slot, currentStreamId);
if (currentStreamId <= 0)
{
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
}
try
{
createLocalStream(slot, frame, promise, listener, streamId);
}
catch (Throwable x)
{
freeSlotAndFailPromise(slot, promise, x);
}
}
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int streamId = reserveSlot(slot, 0);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
try
{
createLocalStream(slot, frame, promise, listener, streamId);
}
catch (Throwable x)
{
freeSlotAndFailPromise(slot, promise, x);
}
}
private int reserveSlot(Slot slot, int streamId)
{
if (streamId <= 0)
{
synchronized (this)
{
streamId = localStreamIds.getAndAdd(2);
slots.offer(slot);
}
}
else
{
synchronized (this)
{
slots.offer(slot);
}
}
return streamId;
}
private void createLocalStream(Slot slot, Frame frame, Promise<Stream> promise, Stream.Listener listener, int streamId)
{
IStream stream = HTTP2Session.this.createLocalStream(streamId);
stream.setListener(listener);
slot.entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
flush();
}
private void freeSlotAndFailPromise(Slot slot, Promise<Stream> promise, Throwable x)
{
synchronized (this)
{
slots.remove(slot);
}
flush();
promise.failed(x);
}
/**
* Flush goes over the entries of the slots queue to flush the entries,
* until either one of the following two conditions is true:
* - The queue is empty.
* - It reaches a slot with a null entry.
* When a slot with a null entry is encountered, this means a concurrent thread reserved a slot
* but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering
* the null entry must bail out and it is up to the concurrent thread to finish up flushing.
* Note that only one thread can flush at any one time, if two threads happen to call flush
* concurrently, one will do the work while the other will bail out, so it is safe that all
* threads call flush after they're done reserving a slot and setting the entry.
*/
private void flush()
{
Thread thread = Thread.currentThread();
boolean queued = false;
while (true)
{
ControlEntry entry;
synchronized (this)
{
if (flushing == null)
flushing = thread;
else if (flushing != thread)
return; // another thread is flushing
Slot slot = slots.peek();
entry = slot == null ? null : slot.entry;
if (entry == null)
{
flushing = null;
break; // No more slots or null entry, so we may iterate on the flusher
}
slots.poll();
}
queued |= flusher.append(entry);
}
if (queued)
flusher.iterate();
}
private class Slot
{
private volatile ControlEntry entry;
}
}
}

View File

@ -26,6 +26,11 @@ public class PushPromiseFrame extends Frame
private final int promisedStreamId;
private final MetaData metaData;
public PushPromiseFrame(int streamId, MetaData metaData)
{
this(streamId, 0, metaData);
}
public PushPromiseFrame(int streamId, int promisedStreamId, MetaData metaData)
{
super(FrameType.PUSH_PROMISE);

View File

@ -51,7 +51,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels;
private boolean recycleHttpChannels = true;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{

View File

@ -65,7 +65,7 @@ public class PushedResourcesTest extends AbstractTest
{
HttpURI pushURI = new HttpURI("http://localhost:" + connector.getLocalPort() + pushPath);
MetaData.Request pushRequest = new MetaData.Request(HttpMethod.GET.asString(), pushURI, HttpVersion.HTTP_2, new HttpFields());
stream.push(new PushPromiseFrame(stream.getId(), 0, pushRequest), new Promise.Adapter<Stream>()
stream.push(new PushPromiseFrame(stream.getId(), pushRequest), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)

View File

@ -93,7 +93,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final AtomicLong totalResponses = new AtomicLong();
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;
private boolean recycleHttpChannels = true;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{

View File

@ -255,7 +255,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}", request);
stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
stream.push(new PushPromiseFrame(stream.getId(), request), new Promise<Stream>()
{
@Override
public void succeeded(Stream pushStream)