Fixes #2672 - Max local stream count exceeded for Http2 Client (#2693)

* Fixes #2672 - Max local stream count exceeded for HttpClient with HTTP/2 transport.

Now waiting for the server preface to arrive to the client before making
the connection available to the ConnectionPool.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-07-16 17:13:47 +02:00 committed by GitHub
parent 9fc7e909f6
commit 980282ef9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 275 additions and 14 deletions

View File

@ -66,6 +66,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Generator generator = new Generator(byteBufferPool);
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,

View File

@ -20,7 +20,10 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicMarkableReference;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
@ -135,7 +138,12 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
if (HttpScheme.HTTPS.is(destination.getScheme()))
sslContextFactory = httpClient.getSslContextFactory();
client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
}
protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
getHTTP2Client().connect(sslContextFactory, address, listener, promise, context);
}
@Override
@ -164,8 +172,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
private class SessionListenerPromise extends Session.Listener.Adapter implements Promise<Session>
{
private final AtomicMarkableReference<HttpConnectionOverHTTP2> connection = new AtomicMarkableReference<>(null, false);
private final Map<String, Object> context;
private HttpConnectionOverHTTP2 connection;
private SessionListenerPromise(Map<String, Object> context)
{
@ -175,14 +183,15 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
@Override
public void succeeded(Session session)
{
connection = newHttpConnection(destination(), session);
promise().succeeded(connection);
// This method is invoked when the client preface
// is sent, but we want to succeed the nested
// promise when the server preface is received.
}
@Override
public void failed(Throwable failure)
{
promise().failed(failure);
failConnectionPromise(failure);
}
private HttpDestinationOverHTTP2 destination()
@ -191,7 +200,7 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
}
@SuppressWarnings("unchecked")
private Promise<Connection> promise()
private Promise<Connection> connectionPromise()
{
return (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
}
@ -202,26 +211,55 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
if (!connection.isMarked())
onServerPreface(session);
}
private void onServerPreface(Session session)
{
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
if (this.connection.compareAndSet(null, connection, false, true))
connectionPromise().succeeded(connection);
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
HttpClientTransportOverHTTP2.this.onClose(connection, frame);
if (failConnectionPromise(new ClosedChannelException()))
return;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
HttpClientTransportOverHTTP2.this.onClose(connection, frame);
}
@Override
public boolean onIdleTimeout(Session session)
{
return connection.onIdleTimeout(((HTTP2Session)session).getEndPoint().getIdleTimeout());
long idleTimeout = ((HTTP2Session)session).getEndPoint().getIdleTimeout();
if (failConnectionPromise(new TimeoutException("Idle timeout expired: " + idleTimeout + " ms")))
return true;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
return connection.onIdleTimeout(idleTimeout);
return true;
}
@Override
public void onFailure(Session session, Throwable failure)
{
HttpConnectionOverHTTP2 c = connection;
if (c != null)
c.close(failure);
if (failConnectionPromise(failure))
return;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
connection.close(failure);
}
private boolean failConnectionPromise(Throwable failure)
{
boolean result = connection.compareAndSet(null, null, false, true);
if (result)
connectionPromise().failed(failure);
return result;
}
}
}

View File

@ -477,22 +477,33 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
ServerParser parser = new ServerParser(byteBufferPool, new ServerParser.Listener.Adapter()
{
@Override
public void onHeaders(HeadersFrame request)
public void onPreface()
{
// Server's preface.
generator.control(lease, new SettingsFrame(new HashMap<>(), false));
// Reply to client's SETTINGS.
generator.control(lease, new SettingsFrame(new HashMap<>(), true));
writeFrames();
}
@Override
public void onHeaders(HeadersFrame request)
{
// Response.
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame response = new HeadersFrame(request.getStreamId(), metaData, null, true);
generator.control(lease, response);
writeFrames();
}
private void writeFrames()
{
try
{
// Write the frames.
for (ByteBuffer buffer : lease.getByteBuffers())
output.write(BufferUtil.toArray(buffer));
lease.recycle();
}
catch (Throwable x)
{

View File

@ -18,33 +18,64 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.Assert;
import org.junit.Test;
public class MaxConcurrentStreamsTest extends AbstractTest
{
private void start(int maxConcurrentStreams, Handler handler) throws Exception
{
startServer(maxConcurrentStreams, handler);
prepareClient();
client.start();
}
private void startServer(int maxConcurrentStreams, Handler handler) throws Exception
{
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
http2.setMaxConcurrentStreams(maxConcurrentStreams);
prepareServer(http2);
server.setHandler(handler);
server.start();
prepareClient();
client.start();
}
@Test
@ -114,6 +145,85 @@ public class MaxConcurrentStreamsTest extends AbstractTest
);
}
@Test
public void testSmallMaxConcurrentStreamsExceededOnClient() throws Exception
{
int maxConcurrentStreams = 1;
startServer(maxConcurrentStreams, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
sleep(1000);
}
});
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
AtomicInteger connections = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
List<Throwable> failures = new ArrayList<>();
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client())
{
@Override
protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
super.connect(sslContextFactory, address, new Wrapper(listener)
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
// Send another request to simulate a request being
// sent concurrently with connection establishment.
// Sending this request will trigger the creation of
// another connection since maxConcurrentStream=1.
if (connections.incrementAndGet() == 1)
{
client.newRequest(host, port)
.path("/2")
.send(result ->
{
if (result.isSucceeded())
{
Response response2 = result.getResponse();
if (response2.getStatus() == HttpStatus.OK_200)
latch.countDown();
else
failures.add(new HttpResponseException("", response2));
}
else
{
failures.add(result.getFailure());
}
});
}
super.onSettings(session, frame);
}
}, promise, context);
}
}, null);
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client.setExecutor(clientExecutor);
client.start();
// This request will be queued and establish the connection,
// which will trigger the send of the second request.
ContentResponse response1 = client.newRequest(host, port)
.path("/1")
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(HttpStatus.OK_200, response1.getStatus());
Assert.assertTrue(failures.toString(), latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(2, connections.get());
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
Assert.assertEquals(2, connectionPool.getConnectionCount());
}
@Test
public void testTwoConcurrentStreamsThirdWaits() throws Exception
{
@ -214,6 +324,49 @@ public class MaxConcurrentStreamsTest extends AbstractTest
Assert.assertTrue(latch.await(maxConcurrent * sleep / 2, TimeUnit.MILLISECONDS));
}
@Test
public void testManyConcurrentRequestsWithSmallConcurrentStreams() throws Exception
{
byte[] data = new byte[64 * 1024];
start(1, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.getOutputStream().write(data);
}
});
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
int parallelism = 16;
int runs = 1;
int iterations = 256;
int total = parallelism * runs * iterations;
CountDownLatch latch = new CountDownLatch(total);
Queue<Result> failures = new ConcurrentLinkedQueue<>();
ForkJoinPool pool = new ForkJoinPool(parallelism);
pool.submit(() -> IntStream.range(0, parallelism).parallel().forEach(i ->
IntStream.range(0, runs).forEach(j ->
{
for (int k = 0; k < iterations; ++k)
{
client.newRequest("localhost", connector.getLocalPort())
.path("/" + i + "_" + j + "_" + k)
.send(result ->
{
if (result.isFailed())
failures.offer(result);
latch.countDown();
});
}
})));
Assert.assertTrue(latch.await(total * 10, TimeUnit.MILLISECONDS));
Assert.assertTrue(failures.toString(), failures.isEmpty());
}
private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
@ -233,4 +386,62 @@ public class MaxConcurrentStreamsTest extends AbstractTest
throw new RuntimeException(x);
}
}
private static class Wrapper implements Session.Listener
{
private final Session.Listener listener;
private Wrapper(Session.Listener listener)
{
this.listener = listener;
}
@Override
public Map<Integer, Integer> onPreface(Session session)
{
return listener.onPreface(session);
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return listener.onNewStream(stream, frame);
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
listener.onSettings(session, frame);
}
@Override
public void onPing(Session session, PingFrame frame)
{
listener.onPing(session, frame);
}
@Override
public void onReset(Session session, ResetFrame frame)
{
listener.onReset(session, frame);
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
listener.onClose(session, frame);
}
@Override
public boolean onIdleTimeout(Session session)
{
return listener.onIdleTimeout(session);
}
@Override
public void onFailure(Session session, Throwable failure)
{
listener.onFailure(session, failure);
}
}
}