Merge branch 'jetty-http2' into http2_flow_control

This commit is contained in:
Simone Bordet 2014-08-05 10:01:30 +02:00
commit 22cdca9131
3 changed files with 236 additions and 16 deletions

View File

@ -0,0 +1,188 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class StreamCountTest extends AbstractTest
{
@Test
public void testServersAllowsOneStreamEnforcedByClient() throws Exception
{
startServer(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 1);
return settings;
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
{
HttpFields fields = new HttpFields();
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
}
}
};
}
});
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
});
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(1, metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
responseLatch.countDown();
}
});
Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS);
HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter());
try
{
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException x)
{
// Expected
}
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback.Adapter());
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testServersAllowsOneStreamEnforcedByServer() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
HTTP2Session session = (HTTP2Session)stream.getSession();
session.setMaxRemoteStreams(1);
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
{
HttpFields fields = new HttpFields();
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
}
}
};
}
});
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
{
resetLatch.countDown();
}
});
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(1, metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
responseLatch.countDown();
}
});
Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS);
HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter());
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback.Adapter());
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -77,7 +77,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicInteger streamIds = new AtomicInteger(); private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger(); private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger(); private final AtomicInteger localStreamCount = new AtomicInteger();
private final AtomicInteger remoteStreamCount = new AtomicInteger();
private final AtomicInteger windowSize = new AtomicInteger(); private final AtomicInteger windowSize = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final Scheduler scheduler; private final Scheduler scheduler;
@ -86,7 +87,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final Listener listener; private final Listener listener;
private final FlowControl flowControl; private final FlowControl flowControl;
private final Flusher flusher; private final Flusher flusher;
private volatile int maxStreamCount; private int maxLocalStreams;
private int maxRemoteStreams;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId) public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId)
{ {
@ -96,11 +98,22 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.listener = listener; this.listener = listener;
this.flowControl = flowControl; this.flowControl = flowControl;
this.flusher = new Flusher(4); this.flusher = new Flusher(4);
this.maxStreamCount = maxStreams; this.maxLocalStreams = maxStreams;
this.maxRemoteStreams = maxStreams;
this.streamIds.set(initialStreamId); this.streamIds.set(initialStreamId);
this.windowSize.set(flowControl.getInitialWindowSize()); this.windowSize.set(flowControl.getInitialWindowSize());
} }
public int getMaxRemoteStreams()
{
return maxRemoteStreams;
}
public void setMaxRemoteStreams(int maxRemoteStreams)
{
this.maxRemoteStreams = maxRemoteStreams;
}
public Generator getGenerator() public Generator getGenerator()
{ {
return generator; return generator;
@ -185,9 +198,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
} }
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS)) if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
{ {
maxStreamCount = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS); maxLocalStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Updated max concurrent streams to {}", maxStreamCount); LOG.debug("Updated max local concurrent streams to {}", maxLocalStreams);
} }
if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE)) if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
{ {
@ -295,7 +308,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
} }
@Override @Override
public void newStream(HeadersFrame frame, final Promise<Stream> promise, Stream.Listener listener) public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{ {
// Synchronization is necessary to atomically create // Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent. // the stream id and enqueue the frame to be sent.
@ -306,12 +319,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(), priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(),
priority.getWeight(), priority.isExclusive()); priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
final IStream stream = createLocalStream(frame); final IStream stream = createLocalStream(frame, promise);
if (stream == null) if (stream == null)
{
promise.failed(new IllegalStateException());
return; return;
}
stream.updateClose(frame.isEndStream(), true); stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener); stream.setListener(listener);
@ -385,8 +395,21 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.iterate(); flusher.iterate();
} }
protected IStream createLocalStream(HeadersFrame frame) protected IStream createLocalStream(HeadersFrame frame, Promise<Stream> promise)
{ {
while (true)
{
int localCount = localStreamCount.get();
int maxCount = maxLocalStreams;
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
break;
}
IStream stream = newStream(frame); IStream stream = newStream(frame);
int streamId = stream.getId(); int streamId = stream.getId();
if (streams.putIfAbsent(streamId, stream) == null) if (streams.putIfAbsent(streamId, stream) == null)
@ -399,6 +422,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
} }
else else
{ {
promise.failed(new IllegalStateException("Duplicate stream " + streamId));
return null; return null;
} }
} }
@ -410,14 +434,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: exceeding max concurrent streams is treated as stream error. // SPEC: exceeding max concurrent streams is treated as stream error.
while (true) while (true)
{ {
int currentStreams = streamCount.get(); int remoteCount = remoteStreamCount.get();
int maxStreams = maxStreamCount; int maxCount = getMaxRemoteStreams();
if (maxStreams >= 0 && currentStreams >= maxStreams) if (maxCount >= 0 && remoteCount >= maxCount)
{ {
reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), disconnectOnFailure()); reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), disconnectOnFailure());
return null; return null;
} }
if (streamCount.compareAndSet(currentStreams, currentStreams + 1)) if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
break; break;
} }
@ -453,7 +477,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
assert removed == stream; assert removed == stream;
if (local) if (local)
streamCount.decrementAndGet(); localStreamCount.decrementAndGet();
else
remoteStreamCount.decrementAndGet();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Removed {}", stream); LOG.debug("Removed {}", stream);

View File

@ -113,6 +113,12 @@ public class MetaDataBuilder
_authority=(field instanceof HostPortHttpField)?((HostPortHttpField)field):new AuthorityHttpField(field.getValue()); _authority=(field instanceof HostPortHttpField)?((HostPortHttpField)field):new AuthorityHttpField(field.getValue());
break; break;
case HOST:
if (_authority==null)
_authority=(field instanceof HostPortHttpField)?((HostPortHttpField)field):new AuthorityHttpField(field.getValue());
_fields.add(field);
break;
case C_PATH: case C_PATH:
_path = field.getValue(); _path = field.getValue();
break; break;