Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-04-29 17:07:25 +02:00
commit fa94cc2f6c
9 changed files with 300 additions and 46 deletions

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HostPortHttpField;
@ -30,6 +32,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
@ -69,10 +72,16 @@ public class AbstractTest
}
protected void start(ServerSessionListener listener) throws Exception
{
start(listener, x -> {});
}
protected void start(ServerSessionListener listener, Consumer<AbstractHTTP2ServerConnectionFactory> configurator) throws Exception
{
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
configurator.accept(connectionFactory);
prepareServer(connectionFactory);
server.start();

View File

@ -0,0 +1,107 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConcurrentStreamCreationTest extends AbstractTest
{
@Test
public void testConcurrentStreamCreation() throws Exception
{
int threads = 64;
int runs = 1;
int iterations = 1024;
int total = threads * runs * iterations;
CountDownLatch serverLatch = new CountDownLatch(total);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.NOOP);
serverLatch.countDown();
return null;
}
}, h2 -> h2.setMaxConcurrentStreams(total));
Session session = newClient(new Session.Listener.Adapter());
CyclicBarrier barrier = new CyclicBarrier(threads);
CountDownLatch clientLatch = new CountDownLatch(total);
CountDownLatch responseLatch = new CountDownLatch(runs);
Promise<Stream> promise = new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
clientLatch.countDown();
}
};
IntStream.range(0, threads).forEach(i -> new Thread(() ->
{
try
{
barrier.await();
IntStream.range(0, runs).forEach(j ->
IntStream.range(0, iterations).forEach(k ->
{
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(request, null, true);
session.newStream(requestFrame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
int status = ((MetaData.Response)frame.getMetaData()).getStatus();
if (status == HttpStatus.OK_200 && frame.isEndStream())
responseLatch.countDown();
}
});
}));
}
catch (Throwable x)
{
x.printStackTrace();
}
}).start());
assertTrue(clientLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on client: %d/%d", clientLatch.getCount(), total));
assertTrue(serverLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on server: %d/%d", serverLatch.getCount(), total));
assertTrue(responseLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing response on client: %d/%d", clientLatch.getCount(), total));
}
}

View File

@ -196,7 +196,7 @@ public class StreamCloseTest extends AbstractTest
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", HttpFields.EMPTY));
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), newRequest("GET", HttpFields.EMPTY));
stream.push(pushFrame, new Promise.Adapter<Stream>()
{
@Override

View File

@ -22,10 +22,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -78,6 +80,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Session.class);
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final StreamCreator streamCreator = new StreamCreator();
private final AtomicBiInteger streamCount = new AtomicBiInteger(); // Hi = closed, Lo = stream count
private final AtomicInteger localStreamIds = new AtomicInteger();
private final AtomicInteger lastRemoteStreamId = new AtomicInteger();
@ -532,6 +535,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
streamCreator.newStream(frame, promise, listener);
/*
try
{
// Synchronization is necessary to atomically create
@ -555,6 +560,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
promise.failed(x);
}
*/
}
/**
@ -569,18 +575,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
*/
public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut)
{
HeadersFrame frame = frameIn;
int streamId = frameIn.getStreamId();
if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2);
PriorityFrame priority = frameIn.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frameIn.getMetaData(), priority, frameIn.isEndStream());
}
HeadersFrame frame = streamCreator.prepareHeadersFrame(streamId, frameIn);
if (frameOut != null)
frameOut[0] = frame;
return createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
}
@ -592,45 +594,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public int priority(PriorityFrame frame, Callback callback)
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream == null)
{
streamId = localStreamIds.getAndAdd(2);
frame = new PriorityFrame(streamId, frame.getParentStreamId(),
frame.getWeight(), frame.isExclusive());
}
control(stream, callback, frame);
return streamId;
return streamCreator.priority(frame, callback);
}
@Override
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{
try
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{
int streamId = localStreamIds.getAndAdd(2);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
IStream pushStream = createLocalStream(streamId, frame.getMetaData());
pushStream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
}
catch (Throwable x)
{
promise.failed(x);
}
streamCreator.push(frame, promise, listener);
}
@Override
@ -1731,4 +1701,167 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
terminate(failure);
}
}
/**
* SPEC: It is required that stream ids are monotonically increasing.
* Here we use a queue to atomically create the stream id and
* claim the slot in the queue. Concurrent threads will only
* flush up to the slot with a non-null entry to make sure
* frames are sent strictly in their stream id order.
* See https://tools.ietf.org/html/rfc7540#section-5.1.1.
*/
private class StreamCreator
{
private final Queue<Slot> slots = new ArrayDeque<>();
private Thread flushing;
private int priority(PriorityFrame frame, Callback callback)
{
Slot slot = new Slot();
int currentStreamId = frame.getStreamId();
int streamId = reserveSlot(slot, currentStreamId);
if (currentStreamId <= 0)
frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive());
assignSlotAndFlush(slot, new ControlEntry(frame, null, callback));
return streamId;
}
private void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int currentStreamId = frame.getStreamId();
int streamId = reserveSlot(slot, currentStreamId);
frame = prepareHeadersFrame(streamId, frame);
try
{
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP);
assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
}
catch (Throwable x)
{
releaseSlotFlushAndFail(slot, promise, x);
}
}
private HeadersFrame prepareHeadersFrame(int streamId, HeadersFrame frame)
{
if (frame.getStreamId() <= 0)
{
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
}
return frame;
}
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int streamId = reserveSlot(slot, 0);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
try
{
IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData());
stream.setListener(listener);
assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
}
catch (Throwable x)
{
releaseSlotFlushAndFail(slot, promise, x);
}
}
private void assignSlotAndFlush(Slot slot, ControlEntry entry)
{
// Every time a slot entry is assigned, we must flush.
slot.entry = entry;
flush();
}
private int reserveSlot(Slot slot, int streamId)
{
if (streamId <= 0)
{
synchronized (this)
{
streamId = localStreamIds.getAndAdd(2);
slots.offer(slot);
}
}
else
{
synchronized (this)
{
slots.offer(slot);
}
}
return streamId;
}
private void releaseSlotFlushAndFail(Slot slot, Promise<Stream> promise, Throwable x)
{
synchronized (this)
{
slots.remove(slot);
}
flush();
promise.failed(x);
}
/**
* Flush goes over the entries of the slots queue to flush the entries,
* until either one of the following two conditions is true:
* - The queue is empty.
* - It reaches a slot with a null entry.
* When a slot with a null entry is encountered, this means a concurrent thread reserved a slot
* but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering
* the null entry must bail out and it is up to the concurrent thread to finish up flushing.
* Note that only one thread can flush at any one time, if two threads happen to call flush
* concurrently, one will do the work while the other will bail out, so it is safe that all
* threads call flush after they're done reserving a slot and setting the entry.
*/
private void flush()
{
Thread thread = Thread.currentThread();
boolean queued = false;
while (true)
{
ControlEntry entry;
synchronized (this)
{
if (flushing == null)
flushing = thread;
else if (flushing != thread)
return; // Another thread is flushing.
Slot slot = slots.peek();
entry = slot == null ? null : slot.entry;
if (entry == null)
{
flushing = null;
// No more slots or null entry, so we may iterate on the flusher.
break;
}
slots.poll();
}
queued |= flusher.append(entry);
}
if (queued)
flusher.iterate();
}
private class Slot
{
private volatile ControlEntry entry;
}
}
}

View File

@ -26,6 +26,11 @@ public class PushPromiseFrame extends Frame
private final int promisedStreamId;
private final MetaData.Request metaData;
public PushPromiseFrame(int streamId, MetaData.Request metaData)
{
this(streamId, 0, metaData);
}
public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData)
{
super(FrameType.PUSH_PROMISE);

View File

@ -61,7 +61,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels;
private boolean recycleHttpChannels = true;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{

View File

@ -64,7 +64,7 @@ public class PushedResourcesTest extends AbstractTest
{
HttpURI pushURI = HttpURI.from("http://localhost:" + connector.getLocalPort() + pushPath);
MetaData.Request pushRequest = new MetaData.Request(HttpMethod.GET.asString(), pushURI, HttpVersion.HTTP_2, HttpFields.EMPTY);
stream.push(new PushPromiseFrame(stream.getId(), 0, pushRequest), new Promise.Adapter<>()
stream.push(new PushPromiseFrame(stream.getId(), pushRequest), new Promise.Adapter<>()
{
@Override
public void succeeded(Stream pushStream)

View File

@ -94,7 +94,7 @@ public class HTTP2ServerConnection extends HTTP2Connection
private final AtomicLong totalResponses = new AtomicLong();
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;
private boolean recycleHttpChannels = true;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{

View File

@ -272,7 +272,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}", request);
stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<>()
stream.push(new PushPromiseFrame(stream.getId(), request), new Promise<>()
{
@Override
public void succeeded(Stream pushStream)