Removed generation of padding bytes, which simplified the code a lot.

Implemented slicing of flow controlled data to never exceed the flow
control window.
This commit is contained in:
Simone Bordet 2014-06-13 14:08:56 +02:00
parent 5ed4f312cd
commit b30152df27
28 changed files with 392 additions and 416 deletions

View File

@ -37,7 +37,7 @@ public class HTTP2ClientSession extends HTTP2Session
public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
{
super(endPoint, generator, listener, flowControl, 1);
super(endPoint, generator, listener, flowControl, -1, 1);
}
@Override

View File

@ -25,6 +25,7 @@ import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
@ -53,14 +54,17 @@ public class AbstractTest
context.addServlet(new ServletHolder(servlet), path);
prepareClient();
server.start();
client.start();
}
protected void startServer(Session.Listener listener) throws Exception
protected void startServer(ServerSessionListener listener) throws Exception
{
prepareServer(new RawHTTP2ServerConnectionFactory(listener));
prepareClient();
server.start();
client.start();
}
private void prepareServer(ConnectionFactory connectionFactory)
@ -77,7 +81,6 @@ public class AbstractTest
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client = new HTTP2Client(clientExecutor);
server.addBean(client);
}
protected Session newClient(Session.Listener listener) throws Exception
@ -93,6 +96,7 @@ public class AbstractTest
@After
public void dispose() throws Exception
{
client.stop();
server.stop();
}

View File

@ -22,19 +22,23 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
@ -48,9 +52,11 @@ public class FlowControlTest extends AbstractTest
// must stop sending data (although the initial window allows it).
final int size = 512;
final CountDownLatch dataLatch = new CountDownLatch(1);
// We get 3 data frames: the first of 1024 and 2 of 512 each
// after the flow control window has been reduced.
final CountDownLatch dataLatch = new CountDownLatch(3);
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
startServer(new Session.Listener.Adapter()
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -67,6 +73,7 @@ public class FlowControlTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
dataLatch.countDown();
int dataFrameCount = dataFrames.incrementAndGet();
if (dataFrameCount == 1)
{
@ -80,7 +87,6 @@ public class FlowControlTest extends AbstractTest
{
// Consume the data.
callback.succeeded();
dataLatch.countDown();
}
}
};
@ -108,7 +114,7 @@ public class FlowControlTest extends AbstractTest
settingsLatch.await(5, TimeUnit.SECONDS);
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.Adapter.INSTANCE);
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
// Consume the data arrived to server, this will resume flow control on the client.
@ -117,70 +123,67 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
/*
@Test
public void testServerFlowControlOneBigWrite() throws Exception
{
final int windowSize = 1536;
final int length = 5 * windowSize;
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
startServer(new ServerSessionListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
stream.data(new BytesDataInfo(new byte[length], true), new Callback.Adapter());
MetaData.Response metaData = new MetaData.Response(200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
stream.data(dataFrame, Callback.Adapter.INSTANCE);
return null;
}
}), null);
});
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize));
session.settings(new SettingsInfo(settings));
Session session = newClient(new Session.Listener.Adapter());
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize);
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
final Exchanger<DataInfo> exchanger = new Exchanger<>();
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
final CountDownLatch dataLatch = new CountDownLatch(1);
final Exchanger<Callback> exchanger = new Exchanger<>();
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
private AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
public void onData(Stream stream, DataFrame frame, Callback callback)
{
try
{
int dataFrames = this.dataFrames.incrementAndGet();
if (dataFrames == 1)
if (dataFrames == 1 || dataFrames == 2)
{
// Do not consume nor read from the data frame.
// We should then be flow-control stalled
exchanger.exchange(dataInfo);
// Do not consume the data frame.
// We should then be flow-control stalled.
exchanger.exchange(callback);
}
else if (dataFrames == 2)
else if (dataFrames == 3 || dataFrames == 4 || dataFrames == 5)
{
// Read but not consume, we should be flow-control stalled
dataInfo.asByteBuffer(false);
exchanger.exchange(dataInfo);
}
else if (dataFrames == 3)
{
// Consume partially, we should be flow-control stalled
dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2));
exchanger.exchange(dataInfo);
}
else if (dataFrames == 4 || dataFrames == 5)
{
// Consume totally
dataInfo.asByteBuffer(true);
exchanger.exchange(dataInfo);
// Consume totally.
callback.succeeded();
if (frame.isEndStream())
dataLatch.countDown();
}
else
{
@ -189,91 +192,83 @@ public class FlowControlTest extends AbstractTest
}
catch (InterruptedException x)
{
throw new SPDYException(x);
callback.failed(x);
}
}
});
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Callback callback = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
// Consume the first chunk.
callback.succeeded();
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
callback = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
// Consume the second chunk.
callback.succeeded();
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
// Check that we are not flow control stalled
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
private void checkThatWeAreFlowControlStalled(Exchanger<Callback> exchanger) throws Exception
{
try
{
exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
catch (TimeoutException x)
{
// Expected.
}
}
@Test
public void testClientFlowControlOneBigWrite() throws Exception
{
final int windowSize = 1536;
final Exchanger<DataInfo> exchanger = new Exchanger<>();
final Exchanger<Callback> exchanger = new Exchanger<>();
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public void onConnect(Session session)
public Map<Integer, Integer> onPreface(Session session)
{
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize));
session.settings(new SettingsInfo(settings), new FutureCallback());
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize);
return settings;
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
return new StreamFrameListener.Adapter()
MetaData.Response metaData = new MetaData.Response(200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
return new Stream.Listener.Adapter()
{
private AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
public void onData(Stream stream, DataFrame frame, Callback callback)
{
try
{
int dataFrames = this.dataFrames.incrementAndGet();
if (dataFrames == 1)
if (dataFrames == 1 || dataFrames == 2)
{
// Do not consume nor read from the data frame.
// We should then be flow-control stalled
exchanger.exchange(dataInfo);
// Do not consume the data frame.
// We should then be flow-control stalled.
exchanger.exchange(callback);
}
else if (dataFrames == 2)
else if (dataFrames == 3 || dataFrames == 4 || dataFrames == 5)
{
// Read but not consume, we should be flow-control stalled
dataInfo.asByteBuffer(false);
exchanger.exchange(dataInfo);
}
else if (dataFrames == 3)
{
// Consume partially, we should be flow-control stalled
dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2));
exchanger.exchange(dataInfo);
}
else if (dataFrames == 4 || dataFrames == 5)
{
// Consume totally
dataInfo.asByteBuffer(true);
exchanger.exchange(dataInfo);
// Consume totally.
callback.succeeded();
if (frame.isEndStream())
dataLatch.countDown();
}
else
{
@ -282,15 +277,17 @@ public class FlowControlTest extends AbstractTest
}
catch (InterruptedException x)
{
throw new SPDYException(x);
callback.failed(x);
}
}
};
}
}), new SessionFrameListener.Adapter()
});
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@ -298,37 +295,34 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), null);
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, null);
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
final int length = 5 * windowSize;
stream.data(new BytesDataInfo(new byte[length], true), new Callback.Adapter());
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
stream.data(dataFrame, Callback.Adapter.INSTANCE);
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Callback callback = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
// Consume the first chunk.
callback.succeeded();
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
callback = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
// Consume the second chunk.
callback.succeeded();
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
// Check that we are not flow control stalled
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
// TODO: add tests for session and stream flow control.
/*
@Test
public void testStreamsStalledDoesNotStallOtherStreams() throws Exception
{
@ -466,30 +460,5 @@ public class FlowControlTest extends AbstractTest
assertThat("all data bytes have been received by the client", allDataReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
private void checkThatWeAreFlowControlStalled(final Exchanger<DataInfo> exchanger)
{
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
}
private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command)
{
try
{
command.call();
Assert.fail();
}
catch (Exception x)
{
Assert.assertSame(exception, x.getClass());
}
}
*/
}

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.log.Logger;
public class HTTP2Connection extends AbstractConnection
{
private static final Logger LOG = Log.getLogger(HTTP2Connection.class);
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
private final ByteBufferPool byteBufferPool;
private final Parser parser;

View File

@ -21,9 +21,13 @@ package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2FlowControl implements FlowControl
{
protected static final Logger LOG = Log.getLogger(HTTP2FlowControl.class);
private volatile int initialWindowSize;
public HTTP2FlowControl(int initialWindowSize)
@ -86,6 +90,7 @@ public class HTTP2FlowControl implements FlowControl
// We currently send a WindowUpdate every time, even if the frame was very small.
// Other policies may send the WindowUpdate only upon reaching a threshold.
LOG.debug("Consumed {} on {}", length, stream);
// Negative streamId allow for generation of bytes for both stream and session
int streamId = stream != null ? -stream.getId() : 0;
WindowUpdateFrame frame = new WindowUpdateFrame(streamId, length);
@ -95,7 +100,8 @@ public class HTTP2FlowControl implements FlowControl
@Override
public void onDataSent(ISession session, IStream stream, int length)
{
stream.getSession().updateWindowSize(length);
stream.updateWindowSize(length);
session.updateWindowSize(length);
if (stream != null)
stream.updateWindowSize(length);
}
}

View File

@ -24,11 +24,10 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -83,14 +82,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final Flusher flusher;
private volatile int maxStreamCount;
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialStreamId)
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId)
{
this.endPoint = endPoint;
this.generator = generator;
this.listener = listener;
this.flowControl = flowControl;
this.flusher = new Flusher(4);
this.maxStreamCount = -1;
this.maxStreamCount = maxStreams;
this.streamIds.set(initialStreamId);
this.windowSize.set(flowControl.getInitialWindowSize());
}
@ -125,7 +124,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
public void succeeded()
{
int consumed = frame.getFlowControlledLength();
LOG.debug("Flow control: {} consumed on {}", consumed, stream);
flowControl.onDataConsumed(HTTP2Session.this, stream, consumed);
}
});
@ -185,7 +183,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (LOG.isDebugEnabled())
{
String reason = tryConvertPayload(frame.getPayload());
LOG.debug("Received {}: {}/{}", frame.getType(), frame.getError(), reason);
LOG.debug("Received {}: {}/'{}'", frame.getType(), frame.getError(), reason);
}
flusher.close();
@ -250,7 +248,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.setListener(listener);
FlusherEntry entry = new FlusherEntry(stream, frame, new PromiseCallback<>(promise, stream));
flusher.offer(entry);
flusher.append(entry);
}
// Iterate outside the synchronized block.
flusher.iterate();
@ -286,13 +284,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void frame(IStream stream, Frame frame, Callback callback)
{
int flowControlledLength = frame.getFlowControlledLength();
if (flowControlledLength > 0)
callback = new FlowControlCallback(stream, flowControlledLength, callback);
// We want to generate as late as possible to allow re-prioritization.
FlusherEntry entry = new FlusherEntry(stream, frame, callback);
LOG.debug("Sending {}", frame);
flusher.flush(entry);
flusher.append(entry);
flusher.iterate();
}
protected void disconnect()
@ -390,11 +386,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
public int updateWindowSize(int delta)
public void updateWindowSize(int delta)
{
int oldSize = windowSize.getAndAdd(delta);
LOG.debug("Flow control: updated window {} -> {} for {}", oldSize, oldSize + delta, this);
return oldSize;
if (delta != 0)
{
int oldSize = windowSize.getAndAdd(delta);
HTTP2FlowControl.LOG.debug("Updated session window {} -> {} for {}", oldSize, oldSize + delta, this);
}
}
private void updateLastStreamId(int streamId)
@ -437,20 +435,20 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private class Flusher extends IteratingCallback
{
private final ArrayQueue<FlusherEntry> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH);
private final Set<IStream> stalled = new HashSet<>();
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<FlusherEntry> reset = new ArrayList<>();
private final ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool());
private final int maxGather;
private final List<FlusherEntry> active;
private boolean closed;
public Flusher(int maxGather)
private Flusher(int maxGather)
{
this.maxGather = maxGather;
this.active = new ArrayList<>(maxGather);
}
private void offer(FlusherEntry entry)
private void append(FlusherEntry entry)
{
boolean fail = false;
synchronized (queue)
@ -464,7 +462,21 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
closed(entry);
}
public int getQueueSize()
private void prepend(FlusherEntry entry)
{
boolean fail = false;
synchronized (queue)
{
if (closed)
fail = true;
else
queue.add(0, entry);
}
if (fail)
closed(entry);
}
private int getQueueSize()
{
synchronized (queue)
{
@ -472,12 +484,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
private void flush(FlusherEntry entry)
{
offer(entry);
iterate();
}
@Override
protected Action process() throws Exception
{
@ -486,19 +492,20 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (closed)
return Action.IDLE;
int sessionWindow = getWindowSize();
int nonStalledIndex = 0;
int size = queue.size();
while (nonStalledIndex < size)
{
FlusherEntry entry = queue.get(nonStalledIndex);
IStream stream = entry.getStream();
boolean flowControlled = entry.getFrame().getFlowControlledLength() > 0;
if (flowControlled)
IStream stream = entry.stream;
int frameWindow = entry.frame.getFlowControlledLength();
if (frameWindow > 0)
{
// Is the session stalled ?
if (getWindowSize() <= 0)
if (sessionWindow <= 0)
{
LOG.debug("Flow control: session stalled {}", HTTP2Session.this);
HTTP2FlowControl.LOG.debug("Session stalled {}", HTTP2Session.this);
++nonStalledIndex;
// There may be *non* flow controlled frames to send.
continue;
@ -506,18 +513,17 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null)
{
// Is it a frame belonging to an already stalled stream ?
if (stalled.contains(stream))
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
++nonStalledIndex;
continue;
streamWindow = stream.getWindowSize();
streams.put(stream, streamWindow);
}
// Is the stream stalled ?
if (stream.getWindowSize() <= 0)
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
LOG.debug("Flow control: stream stalled {}", stream);
stalled.add(stream);
HTTP2FlowControl.LOG.debug("Stream stalled {}", stream);
++nonStalledIndex;
continue;
}
@ -528,18 +534,23 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
queue.remove(nonStalledIndex);
--size;
// Has the stream been reset ?
if (stream != null && stream.isReset() && flowControlled)
// If the stream has been reset, don't send flow controlled frames.
if (stream != null && stream.isReset() && frameWindow > 0)
{
reset.add(entry);
continue;
}
// Reduce the flow control windows.
sessionWindow -= frameWindow;
if (stream != null && frameWindow > 0)
streams.put(stream, streams.get(stream) - frameWindow);
active.add(entry);
if (active.size() == maxGather)
break;
}
stalled.clear();
streams.clear();
}
for (int i = 0; i < reset.size(); ++i)
@ -556,7 +567,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
for (int i = 0; i < active.size(); ++i)
{
FlusherEntry entry = active.get(i);
generator.generate(lease, entry.getFrame());
entry.generate(lease);
}
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
@ -580,6 +591,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void failed(Throwable x)
{
LOG.debug(x);
lease.recycle();
for (int i = 0; i < active.size(); ++i)
{
@ -625,6 +637,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final IStream stream;
private final Frame frame;
private final Callback callback;
private int length;
private FlusherEntry(IStream stream, Frame frame, Callback callback)
{
@ -633,20 +646,45 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.callback = callback;
}
public IStream getStream()
public void generate(ByteBufferPool.Lease lease)
{
return stream;
}
try
{
int windowSize = stream == null ? getWindowSize() : stream.getWindowSize();
int frameLength = frame.getFlowControlledLength();
if (frameLength > 0)
this.length = Math.min(frameLength, windowSize);
public Frame getFrame()
{
return frame;
generator.generate(lease, frame, windowSize);
LOG.debug("Generated {}, windowSize={}", frame, windowSize);
}
catch (Throwable x)
{
LOG.debug("Frame generation failure", x);
failed(x);
}
}
@Override
public void succeeded()
{
callback.succeeded();
flowControl.onDataSent(HTTP2Session.this, stream, -length);
// Do we have more to send ?
if (frame.getFlowControlledLength() > 0)
{
// We have written part of the frame, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that other
// frames for the same stream are written before this one is finished.
flusher.prepend(this);
}
else
{
callback.succeeded();
// TODO: what below is needed ? YES IT IS.
// stream.updateCloseState(dataInfo.isClose(), true);
// if (stream.isClosed())
// removeStream(stream);
}
}
@Override
@ -656,12 +694,12 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
public class PromiseCallback<C> implements Callback
private class PromiseCallback<C> implements Callback
{
private final Promise<C> promise;
private final C value;
public PromiseCallback(Promise<C> promise, C value)
private PromiseCallback(Promise<C> promise, C value)
{
this.promise = promise;
this.value = value;
@ -679,31 +717,4 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
promise.failed(x);
}
}
private class FlowControlCallback implements Callback
{
private final IStream stream;
private final int length;
private final Callback callback;
private FlowControlCallback(IStream stream, int length, Callback callback)
{
this.stream = stream;
this.length = length;
this.callback = callback;
}
@Override
public void succeeded()
{
flowControl.onDataSent(HTTP2Session.this, stream, -length);
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
}
}

View File

@ -205,11 +205,13 @@ public class HTTP2Stream implements IStream
}
@Override
public int updateWindowSize(int delta)
public void updateWindowSize(int delta)
{
int oldSize = windowSize.getAndAdd(delta);
LOG.debug("Flow control: updated window {} -> {} for {}", oldSize, oldSize + delta, this);
return oldSize;
if (delta != 0)
{
int oldSize = windowSize.getAndAdd(delta);
HTTP2FlowControl.LOG.debug("Updated stream window {} -> {} for {}", oldSize, oldSize + delta, this);
}
}
protected boolean notifyData(DataFrame frame, Callback callback)

View File

@ -29,5 +29,5 @@ public interface ISession extends Session
public void frame(IStream stream, Frame frame, Callback callback);
public int updateWindowSize(int delta);
public void updateWindowSize(int delta);
}

View File

@ -46,5 +46,5 @@ public interface IStream extends Stream
public int getWindowSize();
public int updateWindowSize(int delta);
public void updateWindowSize(int delta);
}

View File

@ -18,17 +18,27 @@
package org.eclipse.jetty.http2.api.server;
import java.util.Map;
import org.eclipse.jetty.http2.api.Session;
public interface ServerSessionListener extends Session.Listener
{
public void onConnect(Session session);
public Map<Integer,Integer> onPreface(Session session);
public static class Adapter extends Session.Listener.Adapter implements ServerSessionListener
{
@Override
public void onConnect(Session session)
{
}
@Override
public Map<Integer, Integer> onPreface(Session session)
{
return null;
}
}
}

View File

@ -25,20 +25,13 @@ public class DataFrame extends Frame
private final int streamId;
private final ByteBuffer data;
private final boolean endStream;
private final int length;
public DataFrame(int streamId, ByteBuffer data, boolean endStream)
{
this(streamId, data, endStream, 0);
}
public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding)
{
super(FrameType.DATA);
this.streamId = streamId;
this.data = data;
this.endStream = endStream;
this.length = data.remaining() + padding;
}
public int getStreamId()
@ -59,12 +52,12 @@ public class DataFrame extends Frame
@Override
public int getFlowControlledLength()
{
return length;
return data.remaining();
}
@Override
public String toString()
{
return String.format("%s{length:%d/%d}", super.toString(), data.remaining(), length);
return String.format("%s{length:%d,end=%b}", super.toString(), data.remaining(), endStream);
}
}

View File

@ -35,85 +35,58 @@ public class DataGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
DataFrame dataFrame = (DataFrame)frame;
generateData(lease, dataFrame.getStreamId(), dataFrame.getData(), dataFrame.isEndStream(), false, null);
generateData(lease, dataFrame.getStreamId(), dataFrame.getData(), dataFrame.isEndStream(), maxLength);
}
public void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, boolean compress, byte[] paddingBytes)
public void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
{
if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId);
int paddingLength = paddingBytes == null ? 0 : paddingBytes.length;
// Leave space for at least one byte of content.
if (paddingLength > Frame.MAX_LENGTH - 3)
throw new IllegalArgumentException("Invalid padding length: " + paddingLength);
if (compress)
throw new IllegalArgumentException("Data compression not supported");
int extraPaddingBytes = paddingLength > 0xFF ? 2 : paddingLength > 0 ? 1 : 0;
int dataLength = data.remaining();
if (dataLength <= maxLength && dataLength <= Frame.MAX_LENGTH)
{
// Single frame.
generateFrame(lease, streamId, data, last);
return;
}
// Can we fit just one frame ?
if (dataLength + extraPaddingBytes + paddingLength <= Frame.MAX_LENGTH)
// Other cases, we need to slice the original buffer into multiple frames.
int length = Math.min(maxLength, dataLength);
int dataBytesPerFrame = Frame.MAX_LENGTH;
int frames = length / dataBytesPerFrame;
if (frames * dataBytesPerFrame != length)
++frames;
int begin = data.position();
int end = data.limit();
for (int i = 1; i <= frames; ++i)
{
generateData(lease, streamId, data, last, compress, extraPaddingBytes, paddingBytes);
}
else
{
int dataBytesPerFrame = Frame.MAX_LENGTH - extraPaddingBytes - paddingLength;
int frames = dataLength / dataBytesPerFrame;
if (frames * dataBytesPerFrame != dataLength)
{
++frames;
}
int limit = data.limit();
for (int i = 1; i <= frames; ++i)
{
data.limit(Math.min(dataBytesPerFrame * i, limit));
ByteBuffer slice = data.slice();
data.position(data.limit());
generateData(lease, streamId, slice, i == frames && last, compress, extraPaddingBytes, paddingBytes);
}
data.limit(begin + Math.min(dataBytesPerFrame * i, length));
ByteBuffer slice = data.slice();
data.position(data.limit());
generateFrame(lease, streamId, slice, i == frames && last);
}
data.limit(end);
}
private void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, boolean compress, int extraPaddingBytes, byte[] paddingBytes)
private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last)
{
int paddingLength = paddingBytes == null ? 0 : paddingBytes.length;
int length = extraPaddingBytes + data.remaining() + paddingLength;
int length = data.remaining();
int flags = Flag.NONE;
if (last)
flags |= Flag.END_STREAM;
if (extraPaddingBytes > 0)
flags |= Flag.PADDING_LOW;
if (extraPaddingBytes > 1)
flags |= Flag.PADDING_HIGH;
if (compress)
flags |= Flag.COMPRESS;
ByteBuffer header = generateHeader(lease, FrameType.DATA, Frame.HEADER_LENGTH + extraPaddingBytes, length, flags, streamId);
if (extraPaddingBytes == 2)
{
header.putShort((short)paddingLength);
}
else if (extraPaddingBytes == 1)
{
header.put((byte)paddingLength);
}
ByteBuffer header = generateHeader(lease, FrameType.DATA, length, flags, streamId);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
lease.append(data, false);
if (paddingBytes != null)
{
lease.append(ByteBuffer.wrap(paddingBytes), false);
}
}
}

View File

@ -33,15 +33,10 @@ public abstract class FrameGenerator
this.headerGenerator = headerGenerator;
}
public abstract void generate(ByteBufferPool.Lease lease, Frame frame);
public abstract void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength);
protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId)
{
return generateHeader(lease, frameType, Frame.HEADER_LENGTH + length, length, flags, streamId);
}
protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int capacity, int length, int flags, int streamId)
{
return headerGenerator.generate(lease, frameType, capacity, length, flags, streamId);
return headerGenerator.generate(lease, frameType, Frame.HEADER_LENGTH + length, length, flags, streamId);
}
}

View File

@ -68,8 +68,8 @@ public class Generator
return headerTableSize;
}
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
generators[frame.getType().getType()].generate(lease, frame);
generators[frame.getType().getType()].generate(lease, frame, maxLength);
}
}

View File

@ -35,7 +35,7 @@ public class GoAwayGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
GoAwayFrame goAwayFrame = (GoAwayFrame)frame;
generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload());

View File

@ -40,52 +40,30 @@ public class HeadersGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
HeadersFrame headersFrame = (HeadersFrame)frame;
generate(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), !headersFrame.isEndStream(), null);
generate(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), !headersFrame.isEndStream());
}
private void generate(ByteBufferPool.Lease lease, int streamId, MetaData metaData, boolean contentFollows, byte[] paddingBytes)
private void generate(ByteBufferPool.Lease lease, int streamId, MetaData metaData, boolean contentFollows)
{
if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId);
int paddingLength = paddingBytes == null ? 0 : paddingBytes.length;
// Leave space for at least one byte of content.
if (paddingLength > Frame.MAX_LENGTH - 3)
throw new IllegalArgumentException("Invalid padding length: " + paddingLength);
int extraPaddingBytes = paddingLength > 0xFF ? 2 : paddingLength > 0 ? 1 : 0;
encoder.encode(metaData, lease);
long hpackLength = lease.getTotalLength();
long length = extraPaddingBytes + hpackLength + paddingLength;
long length = lease.getTotalLength();
if (length > Frame.MAX_LENGTH)
throw new IllegalArgumentException("Invalid headers, too big");
int flags = Flag.END_HEADERS;
if (!contentFollows)
flags |= Flag.END_STREAM;
if (extraPaddingBytes > 0)
flags |= Flag.PADDING_LOW;
if (extraPaddingBytes > 1)
flags |= Flag.PADDING_HIGH;
ByteBuffer header = generateHeader(lease, FrameType.HEADERS, Frame.HEADER_LENGTH + extraPaddingBytes, (int)length, flags, streamId);
if (extraPaddingBytes == 2)
header.putShort((short)paddingLength);
else if (extraPaddingBytes == 1)
header.put((byte)paddingLength);
ByteBuffer header = generateHeader(lease, FrameType.HEADERS, (int)length, flags, streamId);
BufferUtil.flipToFlush(header, 0);
lease.prepend(header, true);
if (paddingBytes != null)
{
lease.append(ByteBuffer.wrap(paddingBytes), false);
}
}
}

View File

@ -35,7 +35,7 @@ public class PingGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
PingFrame pingFrame = (PingFrame)frame;
generatePing(lease, pingFrame.getPayload(), pingFrame.isReply());

View File

@ -35,7 +35,7 @@ public class PriorityGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
PriorityFrame priorityFrame = (PriorityFrame)frame;
generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive());

View File

@ -35,7 +35,7 @@ public class ResetGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
ResetFrame resetFrame = (ResetFrame)frame;
generateReset(lease, resetFrame.getStreamId(), resetFrame.getError());

View File

@ -36,7 +36,7 @@ public class SettingsGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply());

View File

@ -35,7 +35,7 @@ public class WindowUpdateGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{
WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame;
generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta());

View File

@ -49,7 +49,7 @@ public class DataBodyParser extends BodyParser
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "invalid_data_frame");
return false;
}
return onData(BufferUtil.EMPTY_BUFFER, false, 0);
return onData(BufferUtil.EMPTY_BUFFER, false);
}
@Override
@ -112,11 +112,11 @@ public class DataBodyParser extends BodyParser
buffer.position(position + size);
length -= size;
loop = paddingLength == 0;
if (length == 0)
{
state = State.PADDING;
if (onData(slice, false, paddingLength))
loop = paddingLength == 0;
if (onData(slice, false))
{
return Result.ASYNC;
}
@ -125,7 +125,7 @@ public class DataBodyParser extends BodyParser
{
// TODO: check the semantic of Flag.END_SEGMENT.
// We got partial data, simulate a smaller frame, and stay in DATA state.
if (onData(slice, true, 0))
if (onData(slice, true))
{
return Result.ASYNC;
}
@ -153,9 +153,9 @@ public class DataBodyParser extends BodyParser
return Result.PENDING;
}
private boolean onData(ByteBuffer buffer, boolean fragment, int padding)
private boolean onData(ByteBuffer buffer, boolean fragment)
{
DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream(), padding);
DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream());
return notifyData(frame);
}

View File

@ -48,47 +48,18 @@ public class DataGenerateParseTest
@Test
public void testGenerateParseNoContentNoPadding()
{
testGenerateParseContent(0, BufferUtil.EMPTY_BUFFER);
}
@Test
public void testGenerateParseNoContentSmallPadding()
{
testGenerateParseContent(128, BufferUtil.EMPTY_BUFFER);
}
@Test
public void testGenerateParseNoContentLargePadding()
{
testGenerateParseContent(1024, BufferUtil.EMPTY_BUFFER);
testGenerateParseContent(BufferUtil.EMPTY_BUFFER);
}
@Test
public void testGenerateParseSmallContentNoPadding()
{
testGenerateParseSmallContent(0);
testGenerateParseContent(ByteBuffer.wrap(smallContent));
}
@Test
public void testGenerateParseSmallContentSmallPadding()
private void testGenerateParseContent(ByteBuffer content)
{
testGenerateParseSmallContent(128);
}
@Test
public void testGenerateParseSmallContentLargePadding()
{
testGenerateParseSmallContent(1024);
}
private void testGenerateParseSmallContent(int paddingLength)
{
testGenerateParseContent(paddingLength, ByteBuffer.wrap(smallContent));
}
private void testGenerateParseContent(int paddingLength, ByteBuffer content)
{
List<DataFrame> frames = testGenerateParse(paddingLength, content);
List<DataFrame> frames = testGenerateParse(content);
Assert.assertEquals(1, frames.size());
DataFrame frame = frames.get(0);
Assert.assertTrue(frame.getStreamId() != 0);
@ -98,26 +69,9 @@ public class DataGenerateParseTest
@Test
public void testGenerateParseLargeContent()
{
testGenerateParseLargeContent(0);
}
@Test
public void testGenerateParseLargeContentSmallPadding()
{
testGenerateParseLargeContent(128);
}
@Test
public void testGenerateParseLargeContentLargePadding()
{
testGenerateParseLargeContent(1024);
}
private void testGenerateParseLargeContent(int paddingLength)
{
ByteBuffer content = ByteBuffer.wrap(largeContent);
List<DataFrame> frames = testGenerateParse(paddingLength, content);
List<DataFrame> frames = testGenerateParse(content);
Assert.assertEquals(9, frames.size());
ByteBuffer aggregate = ByteBuffer.allocate(content.remaining());
for (int i = 1; i <= frames.size(); ++i)
@ -131,7 +85,7 @@ public class DataGenerateParseTest
Assert.assertEquals(content, aggregate);
}
private List<DataFrame> testGenerateParse(int paddingLength, ByteBuffer... data)
private List<DataFrame> testGenerateParse(ByteBuffer data)
{
DataGenerator generator = new DataGenerator(new HeaderGenerator());
@ -140,10 +94,7 @@ public class DataGenerateParseTest
for (int i = 0; i < 2; ++i)
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
for (int j = 1; j <= data.length; ++j)
{
generator.generateData(lease, 13, data[j - 1].slice(), j == data.length, false, new byte[paddingLength]);
}
generator.generateData(lease, 13, data.slice(), true, data.remaining());
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@ -171,7 +122,8 @@ public class DataGenerateParseTest
DataGenerator generator = new DataGenerator(new HeaderGenerator());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generateData(lease, 13, ByteBuffer.wrap(largeContent).slice(), true, false, new byte[1024]);
ByteBuffer data = ByteBuffer.wrap(largeContent);
generator.generateData(lease, 13, data.slice(), true, data.remaining());
final List<DataFrame> frames = new ArrayList<>();
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()

View File

@ -18,12 +18,16 @@
package org.eclipse.jetty.http2.server;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AbstractConnectionFactory;
@ -33,6 +37,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
{
private int headerTableSize = 4096;
private int initialWindowSize = 65535;
private int maxConcurrentStreams = -1;
public AbstractHTTP2ServerConnectionFactory()
{
@ -59,21 +64,63 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.initialWindowSize = initialWindowSize;
}
public int getMaxConcurrentStreams()
{
return maxConcurrentStreams;
}
public void setMaxConcurrentStreams(int maxConcurrentStreams)
{
this.maxConcurrentStreams = maxConcurrentStreams;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
Session.Listener listener = newSessionListener(connector, endPoint);
ServerSessionListener listener = newSessionListener(connector, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getHeaderTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener,
new HTTP2FlowControl(getInitialWindowSize()));
new HTTP2FlowControl(getInitialWindowSize()), getMaxConcurrentStreams());
Parser parser = new ServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2Connection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, parser, getInputBufferSize());
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, parser, getInputBufferSize(), listener, session);
return configure(connection, connector, endPoint);
}
protected abstract Session.Listener newSessionListener(Connector connector, EndPoint endPoint);
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);
private class HTTP2ServerConnection extends HTTP2Connection
{
private final ServerSessionListener listener;
private final Session session;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, int inputBufferSize, ServerSessionListener listener, Session session)
{
super(byteBufferPool, executor, endPoint, parser, inputBufferSize);
this.listener = listener;
this.session = session;
}
@Override
public void onOpen()
{
super.onOpen();
notifyConnect(session);
}
private void notifyConnect(Session session)
{
try
{
listener.onConnect(session);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
}
}

View File

@ -18,12 +18,16 @@
package org.eclipse.jetty.http2.server;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
@ -44,7 +48,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
}
@Override
protected Session.Listener newSessionListener(Connector connector, EndPoint endPoint)
protected ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint)
{
return new HTTPServerSessionListener(connector, httpConfiguration, endPoint);
}
@ -62,6 +66,18 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
this.endPoint = endPoint;
}
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.HEADER_TABLE_SIZE, getHeaderTableSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialWindowSize());
int maxConcurrentStreams = getMaxConcurrentStreams();
if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
return settings;
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{

View File

@ -18,36 +18,43 @@
package org.eclipse.jetty.http2.server;
import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Listener
{
public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
private static final Logger LOG = Log.getLogger(HTTP2ServerSession.class);
private final ServerSessionListener listener;
public HTTP2ServerSession(EndPoint endPoint, Generator generator, ServerSessionListener listener, FlowControl flowControl, int maxStreams)
{
super(endPoint, generator, listener, flowControl, 2);
super(endPoint, generator, listener, flowControl, maxStreams, 2);
this.listener = listener;
}
@Override
public boolean onPreface()
{
// SPEC: send a SETTINGS frame upon receiving the preface.
HashMap<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.HEADER_TABLE_SIZE, getGenerator().getHeaderTableSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getFlowControl().getInitialWindowSize());
int maxConcurrentStreams = getMaxStreamCount();
if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
Map<Integer, Integer> settings = notifyPreface(this);
if (settings == null)
settings = Collections.emptyMap();
SettingsFrame frame = new SettingsFrame(settings, false);
settings(frame, disconnectCallback);
return false;
@ -69,4 +76,17 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
return false;
}
private Map<Integer, Integer> notifyPreface(Session session)
{
try
{
return listener.onPreface(session);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return null;
}
}
}

View File

@ -18,21 +18,21 @@
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory
{
private final Session.Listener listener;
private final ServerSessionListener listener;
public RawHTTP2ServerConnectionFactory(Session.Listener listener)
public RawHTTP2ServerConnectionFactory(ServerSessionListener listener)
{
this.listener = listener;
}
@Override
protected Session.Listener newSessionListener(Connector connector, EndPoint endPoint)
protected ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint)
{
return listener;
}

View File

@ -105,7 +105,7 @@ public class HTTP2ServerTest
host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request);
generator.generate(lease, request, 0);
// No preface bytes
@ -154,7 +154,7 @@ public class HTTP2ServerTest
host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request);
generator.generate(lease, request, 0);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port))
@ -217,7 +217,7 @@ public class HTTP2ServerTest
host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request);
generator.generate(lease, request, 0);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port))