Fixed handling of max concurrent streams.

There is a difference between the value set via configuration, that
always refer to remote streams (streams initiated by remote peers),
and the value received via SETTINGS frame, that always refer to local
streams (streams initiated locally).
This commit is contained in:
Simone Bordet 2014-08-05 01:44:34 +02:00
parent 48d68a4916
commit 636c7eaeae
2 changed files with 230 additions and 16 deletions

View File

@ -0,0 +1,188 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class StreamCountTest extends AbstractTest
{
@Test
public void testServersAllowsOneStreamEnforcedByClient() throws Exception
{
startServer(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 1);
return settings;
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
{
HttpFields fields = new HttpFields();
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
}
}
};
}
});
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
});
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(1, metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
responseLatch.countDown();
}
});
Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS);
HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter());
try
{
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException x)
{
// Expected
}
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback.Adapter());
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testServersAllowsOneStreamEnforcedByServer() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
HTTP2Session session = (HTTP2Session)stream.getSession();
session.setMaxRemoteStreams(1);
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
{
HttpFields fields = new HttpFields();
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
}
}
};
}
});
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
{
resetLatch.countDown();
}
});
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(1, metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
responseLatch.countDown();
}
});
Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS);
HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter());
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback.Adapter());
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -77,7 +77,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
private final AtomicInteger remoteStreamCount = new AtomicInteger();
private final AtomicInteger windowSize = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final Scheduler scheduler;
@ -86,7 +87,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final Listener listener;
private final FlowControl flowControl;
private final Flusher flusher;
private volatile int maxStreamCount;
private int maxLocalStreams;
private int maxRemoteStreams;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId)
{
@ -96,11 +98,22 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.listener = listener;
this.flowControl = flowControl;
this.flusher = new Flusher(4);
this.maxStreamCount = maxStreams;
this.maxLocalStreams = maxStreams;
this.maxRemoteStreams = maxStreams;
this.streamIds.set(initialStreamId);
this.windowSize.set(flowControl.getInitialWindowSize());
}
public int getMaxRemoteStreams()
{
return maxRemoteStreams;
}
public void setMaxRemoteStreams(int maxRemoteStreams)
{
this.maxRemoteStreams = maxRemoteStreams;
}
public Generator getGenerator()
{
return generator;
@ -185,9 +198,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
{
maxStreamCount = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
maxLocalStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
if (LOG.isDebugEnabled())
LOG.debug("Updated max concurrent streams to {}", maxStreamCount);
LOG.debug("Updated max local concurrent streams to {}", maxLocalStreams);
}
if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
{
@ -295,7 +308,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
public void newStream(HeadersFrame frame, final Promise<Stream> promise, Stream.Listener listener)
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
@ -306,12 +319,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
final IStream stream = createLocalStream(frame);
final IStream stream = createLocalStream(frame, promise);
if (stream == null)
{
promise.failed(new IllegalStateException());
return;
}
stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener);
@ -385,8 +395,21 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.iterate();
}
protected IStream createLocalStream(HeadersFrame frame)
protected IStream createLocalStream(HeadersFrame frame, Promise<Stream> promise)
{
while (true)
{
int localCount = localStreamCount.get();
int maxCount = maxLocalStreams;
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
break;
}
IStream stream = newStream(frame);
int streamId = stream.getId();
if (streams.putIfAbsent(streamId, stream) == null)
@ -399,6 +422,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
else
{
promise.failed(new IllegalStateException("Duplicate stream " + streamId));
return null;
}
}
@ -410,14 +434,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: exceeding max concurrent streams is treated as stream error.
while (true)
{
int currentStreams = streamCount.get();
int maxStreams = maxStreamCount;
if (maxStreams >= 0 && currentStreams >= maxStreams)
int remoteCount = remoteStreamCount.get();
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount >= maxCount)
{
reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), disconnectOnFailure());
return null;
}
if (streamCount.compareAndSet(currentStreams, currentStreams + 1))
if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
break;
}
@ -453,7 +477,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
assert removed == stream;
if (local)
streamCount.decrementAndGet();
localStreamCount.decrementAndGet();
else
remoteStreamCount.decrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("Removed {}", stream);