Implemented flow control.

This commit is contained in:
Simone Bordet 2014-06-12 13:51:46 +02:00
parent 74bf0000c5
commit 5ed4f312cd
31 changed files with 1222 additions and 259 deletions

View File

@ -51,8 +51,6 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Client extends ContainerLifeCycle
{
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final Executor executor;
private final Scheduler scheduler;
private final SelectorManager selector;
private final ByteBufferPool byteBufferPool;
@ -63,9 +61,8 @@ public class HTTP2Client extends ContainerLifeCycle
public HTTP2Client(Executor executor)
{
this.executor = executor;
addBean(executor);
this.scheduler = new ScheduledExecutorScheduler();
Scheduler scheduler = new ScheduledExecutorScheduler();
addBean(scheduler, true);
this.selector = new ClientSelectorManager(executor, scheduler);
addBean(selector, true);
@ -121,7 +118,7 @@ public class HTTP2Client extends ContainerLifeCycle
{
Context context = (Context)attachment;
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2Session session = new HTTP2ClientSession(endpoint, generator, context.listener, new HTTP2FlowControl(), 65535);
HTTP2Session session = new HTTP2ClientSession(endpoint, generator, context.listener, new HTTP2FlowControl(65535));
Parser parser = new Parser(byteBufferPool, session);
Connection connection = new HTTP2ClientConnection(byteBufferPool, getExecutor(), endpoint, parser, 8192, session);
context.promise.succeeded(session);

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -34,9 +35,9 @@ public class HTTP2ClientSession extends HTTP2Session
{
private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class);
public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize)
public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
{
super(endPoint, generator, listener, flowControl, initialWindowSize, 1);
super(endPoint, generator, listener, flowControl, 1);
}
@Override
@ -52,7 +53,7 @@ public class HTTP2ClientSession extends HTTP2Session
else
{
stream.updateClose(frame.isEndStream(), false);
stream.process(frame);
stream.process(frame, Callback.Adapter.INSTANCE);
notifyHeaders(stream, frame);
if (stream.isClosed())
removeStream(stream, false);

View File

@ -0,0 +1,106 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
public class AbstractTest
{
private ServerConnector connector;
private String path = "/test";
private HTTP2Client client;
private Server server;
protected void startServer(HttpServlet servlet) throws Exception
{
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration()));
ServletContextHandler context = new ServletContextHandler(server, "/");
context.addServlet(new ServletHolder(servlet), path);
prepareClient();
server.start();
}
protected void startServer(Session.Listener listener) throws Exception
{
prepareServer(new RawHTTP2ServerConnectionFactory(listener));
prepareClient();
server.start();
}
private void prepareServer(ConnectionFactory connectionFactory)
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
connector = new ServerConnector(server, connectionFactory);
server.addConnector(connector);
}
private void prepareClient()
{
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client = new HTTP2Client(clientExecutor);
server.addBean(client);
}
protected Session newClient(Session.Listener listener) throws Exception
{
String host = "localhost";
int port = connector.getLocalPort();
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, listener, promise);
return promise.get(5, TimeUnit.SECONDS);
}
@After
public void dispose() throws Exception
{
server.stop();
}
protected MetaData.Request newRequest(String method, HttpFields fields)
{
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
return new MetaData.Request(HttpScheme.HTTP, method, authority, host, port, path, fields);
}
}

View File

@ -0,0 +1,33 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class EmptyHttpServlet extends HttpServlet
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
}
}

View File

@ -0,0 +1,495 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class FlowControlTest extends AbstractTest
{
@Test
public void testFlowControlWithConcurrentSettings() throws Exception
{
// Initial window is 64 KiB. We allow the client to send 1024 B
// then we change the window to 512 B. At this point, the client
// must stop sending data (although the initial window allows it).
final int size = 512;
final CountDownLatch dataLatch = new CountDownLatch(1);
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
startServer(new Session.Listener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
HttpFields fields = new HttpFields();
MetaData.Response response = new MetaData.Response(200, fields);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
return new Stream.Listener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
int dataFrameCount = dataFrames.incrementAndGet();
if (dataFrameCount == 1)
{
callbackRef.set(callback);
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, size);
stream.getSession().settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
// Do not succeed the callback here.
}
else if (dataFrameCount > 1)
{
// Consume the data.
callback.succeeded();
dataLatch.countDown();
}
}
};
}
});
// Two SETTINGS frames, the initial one and the one we send.
final CountDownLatch settingsLatch = new CountDownLatch(2);
Session client = newClient(new Session.Listener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
});
MetaData.Request request = newRequest("POST", new HttpFields());
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(new HeadersFrame(0, request, null, false), promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Send first chunk that exceeds the window.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE);
settingsLatch.await(5, TimeUnit.SECONDS);
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE);
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
// Consume the data arrived to server, this will resume flow control on the client.
callbackRef.get().succeeded();
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
/*
@Test
public void testServerFlowControlOneBigWrite() throws Exception
{
final int windowSize = 1536;
final int length = 5 * windowSize;
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsLatch.countDown();
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
stream.data(new BytesDataInfo(new byte[length], true), new Callback.Adapter());
return null;
}
}), null);
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize));
session.settings(new SettingsInfo(settings));
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
final Exchanger<DataInfo> exchanger = new Exchanger<>();
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
private AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
try
{
int dataFrames = this.dataFrames.incrementAndGet();
if (dataFrames == 1)
{
// Do not consume nor read from the data frame.
// We should then be flow-control stalled
exchanger.exchange(dataInfo);
}
else if (dataFrames == 2)
{
// Read but not consume, we should be flow-control stalled
dataInfo.asByteBuffer(false);
exchanger.exchange(dataInfo);
}
else if (dataFrames == 3)
{
// Consume partially, we should be flow-control stalled
dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2));
exchanger.exchange(dataInfo);
}
else if (dataFrames == 4 || dataFrames == 5)
{
// Consume totally
dataInfo.asByteBuffer(true);
exchanger.exchange(dataInfo);
}
else
{
Assert.fail();
}
}
catch (InterruptedException x)
{
throw new SPDYException(x);
}
}
});
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
// Check that we are not flow control stalled
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
}
@Test
public void testClientFlowControlOneBigWrite() throws Exception
{
final int windowSize = 1536;
final Exchanger<DataInfo> exchanger = new Exchanger<>();
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public void onConnect(Session session)
{
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize));
session.settings(new SettingsInfo(settings), new FutureCallback());
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
return new StreamFrameListener.Adapter()
{
private AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
try
{
int dataFrames = this.dataFrames.incrementAndGet();
if (dataFrames == 1)
{
// Do not consume nor read from the data frame.
// We should then be flow-control stalled
exchanger.exchange(dataInfo);
}
else if (dataFrames == 2)
{
// Read but not consume, we should be flow-control stalled
dataInfo.asByteBuffer(false);
exchanger.exchange(dataInfo);
}
else if (dataFrames == 3)
{
// Consume partially, we should be flow-control stalled
dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2));
exchanger.exchange(dataInfo);
}
else if (dataFrames == 4 || dataFrames == 5)
{
// Consume totally
dataInfo.asByteBuffer(true);
exchanger.exchange(dataInfo);
}
else
{
Assert.fail();
}
}
catch (InterruptedException x)
{
throw new SPDYException(x);
}
}
};
}
}), new SessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsLatch.countDown();
}
});
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), null);
final int length = 5 * windowSize;
stream.data(new BytesDataInfo(new byte[length], true), new Callback.Adapter());
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
// Check that we are not flow control stalled
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
}
@Test
public void testStreamsStalledDoesNotStallOtherStreams() throws Exception
{
final int windowSize = 1024;
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsLatch.countDown();
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
stream.data(new BytesDataInfo(new byte[windowSize * 2], true), new Callback.Adapter());
return null;
}
}), null);
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize));
session.settings(new SettingsInfo(settings));
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<DataInfo> dataInfoRef1 = new AtomicReference<>();
final AtomicReference<DataInfo> dataInfoRef2 = new AtomicReference<>();
session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), new StreamFrameListener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
int frames = dataFrames.incrementAndGet();
if (frames == 1)
{
// Do not consume it to stall flow control
dataInfoRef1.set(dataInfo);
}
else
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
latch.countDown();
}
}
});
session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), new StreamFrameListener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
int frames = dataFrames.incrementAndGet();
if (frames == 1)
{
// Do not consume it to stall flow control
dataInfoRef2.set(dataInfo);
}
else
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
latch.countDown();
}
}
});
session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
DataInfo dataInfo1 = dataInfoRef1.getAndSet(null);
if (dataInfo1 != null)
dataInfo1.consume(dataInfo1.length());
DataInfo dataInfo2 = dataInfoRef2.getAndSet(null);
if (dataInfo2 != null)
dataInfo2.consume(dataInfo2.length());
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testSendBigFileWithoutFlowControl() throws Exception
{
testSendBigFile(SPDY.V2);
}
@Test
public void testSendBigFileWithFlowControl() throws Exception
{
testSendBigFile(SPDY.V3);
}
private void testSendBigFile(short version) throws Exception
{
final int dataSize = 1024 * 1024;
final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false);
final CountDownLatch allDataReceivedLatch = new CountDownLatch(1);
Session session = startClient(version, startServer(version, new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
stream.data(bigByteBufferDataInfo, new Callback.Adapter());
return null;
}
}),new SessionFrameListener.Adapter());
session.syn(new SynInfo(new Fields(), false),new StreamFrameListener.Adapter()
{
private int dataBytesReceived;
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataBytesReceived = dataBytesReceived + dataInfo.length();
dataInfo.consume(dataInfo.length());
if (dataBytesReceived == dataSize)
allDataReceivedLatch.countDown();
}
});
assertThat("all data bytes have been received by the client", allDataReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
private void checkThatWeAreFlowControlStalled(final Exchanger<DataInfo> exchanger)
{
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
}
private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command)
{
try
{
command.call();
Assert.fail();
}
catch (Exception x)
{
Assert.assertSame(exception, x.getClass());
}
}
*/
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
@ -30,83 +29,30 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HTTP2Test
public class HTTP2Test extends AbstractTest
{
private Server server;
private ServerConnector connector;
private String path;
private HTTP2Client client;
private void startServer(HttpServlet servlet) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
connector = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration()));
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/");
path = "/test";
context.addServlet(new ServletHolder(servlet), path);
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client = new HTTP2Client(clientExecutor);
server.addBean(client);
server.start();
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testRequestNoContentResponseNoContent() throws Exception
{
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
}
});
startServer(new EmptyHttpServlet());
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, new Session.Listener.Adapter(), promise);
Session session = promise.get();
Session client = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, "GET", authority, host, port, path, fields);
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
final CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
client.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
@ -140,19 +86,13 @@ public class HTTP2Test
}
});
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, new Session.Listener.Adapter(), promise);
Session session = promise.get();
Session client = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, "GET", authority, host, port, path, fields);
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
final CountDownLatch latch = new CountDownLatch(2);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
client.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
@ -177,6 +117,7 @@ public class HTTP2Test
Assert.assertTrue(frame.isEndStream());
Assert.assertEquals(ByteBuffer.wrap(content), frame.getData());
callback.succeeded();
latch.countDown();
}
});

View File

@ -18,11 +18,21 @@
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
public interface FlowControl
{
public void onNewStream(IStream stream);
public int getWindowSize(ISession session);
public int getInitialWindowSize();
public void setWindowSize(ISession session, int windowSize);
public void updateInitialWindowSize(ISession session, int initialWindowSize);
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame);
public void onDataReceived(ISession session, IStream stream, int length);
public void onDataConsumed(ISession session, IStream stream, int length);
public void onDataSent(ISession session, IStream stream, int length);
}

View File

@ -18,21 +18,84 @@
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
public class HTTP2FlowControl implements FlowControl
{
private volatile int initialWindowSize;
public HTTP2FlowControl(int initialWindowSize)
{
this.initialWindowSize = initialWindowSize;
}
@Override
public void onNewStream(IStream stream)
{
stream.updateWindowSize(initialWindowSize);
}
@Override
public int getWindowSize(ISession session)
public int getInitialWindowSize()
{
return 0;
return initialWindowSize;
}
@Override
public void setWindowSize(ISession session, int windowSize)
public void updateInitialWindowSize(ISession session, int initialWindowSize)
{
int windowSize = this.initialWindowSize;
this.initialWindowSize = initialWindowSize;
int delta = initialWindowSize - windowSize;
// Update the sessions's window size.
session.updateWindowSize(delta);
// Update the streams' window size.
for (Stream stream : session.getStreams())
((IStream)stream).updateWindowSize(delta);
}
@Override
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
if (frame.getStreamId() > 0)
{
if (stream != null)
stream.updateWindowSize(frame.getWindowDelta());
}
else
{
session.updateWindowSize(frame.getWindowDelta());
}
}
@Override
public void onDataReceived(ISession session, IStream stream, int length)
{
}
@Override
public void onDataConsumed(ISession session, IStream stream, int length)
{
// This is the algorithm for flow control.
// This method is called when a whole flow controlled frame has been consumed.
// We currently send a WindowUpdate every time, even if the frame was very small.
// Other policies may send the WindowUpdate only upon reaching a threshold.
// Negative streamId allow for generation of bytes for both stream and session
int streamId = stream != null ? -stream.getId() : 0;
WindowUpdateFrame frame = new WindowUpdateFrame(streamId, length);
session.frame(stream, frame, Callback.Adapter.INSTANCE);
}
@Override
public void onDataSent(ISession session, IStream stream, int length)
{
stream.getSession().updateWindowSize(length);
stream.updateWindowSize(length);
}
}

View File

@ -24,9 +24,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -45,6 +47,7 @@ import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Atomics;
@ -72,23 +75,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger();
private final Flusher flusher = new Flusher();
private final AtomicInteger windowSize = new AtomicInteger();
private final EndPoint endPoint;
private final Generator generator;
private final Listener listener;
private final FlowControl flowControl;
private final int initialWindowSize;
private final Flusher flusher;
private volatile int maxStreamCount;
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize, int initialStreamId)
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialStreamId)
{
this.endPoint = endPoint;
this.generator = generator;
this.listener = listener;
this.flowControl = flowControl;
this.initialWindowSize = initialWindowSize;
this.flusher = new Flusher(4);
this.maxStreamCount = -1;
this.streamIds.set(initialStreamId);
this.windowSize.set(flowControl.getInitialWindowSize());
}
public Generator getGenerator()
@ -96,11 +100,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return generator;
}
public int getInitialWindowSize()
{
return initialWindowSize;
}
public int getMaxStreamCount()
{
return maxStreamCount;
@ -112,14 +111,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
public boolean onData(DataFrame frame)
public boolean onData(final DataFrame frame)
{
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
final IStream stream = getStream(streamId);
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
return stream.process(frame);
flowControl.onDataReceived(this, stream, frame.getFlowControlledLength());
return stream.process(frame, new Callback.Adapter()
{
@Override
public void succeeded()
{
int consumed = frame.getFlowControlledLength();
LOG.debug("Flow control: {} consumed on {}", consumed, stream);
flowControl.onDataConsumed(HTTP2Session.this, stream, consumed);
}
});
}
else
{
@ -156,8 +165,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
{
int windowSize = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE);
setWindowSize(windowSize);
LOG.debug("Updated window size to {}", windowSize);
flowControl.updateInitialWindowSize(this, windowSize);
LOG.debug("Updated initial window size to {}", windowSize);
}
// TODO: handle other settings
notifySettings(this, frame);
@ -202,6 +211,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public boolean onWindowUpdate(WindowUpdateFrame frame)
{
int streamId = frame.getStreamId();
IStream stream = null;
if (streamId > 0)
stream = getStream(streamId);
flowControl.onWindowUpdate(this, stream, frame);
// Flush stalled data.
flusher.iterate();
return false;
}
@ -231,7 +248,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener);
flusher.offer(generator.generate(frame, new PromiseCallback<>(promise, stream)));
FlusherEntry entry = new FlusherEntry(stream, frame, new PromiseCallback<>(promise, stream));
flusher.offer(entry);
}
// Iterate outside the synchronized block.
flusher.iterate();
@ -240,19 +259,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void settings(SettingsFrame frame, Callback callback)
{
frame(frame, callback);
frame(null, frame, callback);
}
@Override
public void ping(PingFrame frame, Callback callback)
{
frame(frame, callback);
frame(null, frame, callback);
}
@Override
public void reset(ResetFrame frame, Callback callback)
{
frame(frame, callback);
frame(null, frame, callback);
}
@Override
@ -261,14 +280,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
LOG.debug("Sending {}: {}", frame.getType(), reason);
frame(frame, callback);
frame(null, frame, callback);
}
@Override
public void frame(Frame frame, Callback callback)
public void frame(IStream stream, Frame frame, Callback callback)
{
Generator.LeaseCallback lease = generator.generate(frame, callback);
flusher.flush(lease);
int flowControlledLength = frame.getFlowControlledLength();
if (flowControlledLength > 0)
callback = new FlowControlCallback(stream, flowControlledLength, callback);
// We want to generate as late as possible to allow re-prioritization.
FlusherEntry entry = new FlusherEntry(stream, frame, callback);
LOG.debug("Sending {}", frame);
flusher.flush(entry);
}
protected void disconnect()
@ -281,9 +305,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
IStream stream = newStream(frame);
int streamId = stream.getId();
updateLastStreamId(streamId);
if (streams.putIfAbsent(streamId, stream) == null)
{
flowControl.onNewStream(stream);
LOG.debug("Created local {}", stream);
return stream;
}
@ -317,6 +341,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (streams.putIfAbsent(streamId, stream) == null)
{
updateLastStreamId(streamId);
flowControl.onNewStream(stream);
LOG.debug("Created remote {}", stream);
return stream;
}
@ -359,16 +384,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return streams.get(streamId);
}
protected int getWindowSize()
{
return windowSize.get();
}
@Override
public int updateWindowSize(int delta)
{
int oldSize = windowSize.getAndAdd(delta);
LOG.debug("Flow control: updated window {} -> {} for {}", oldSize, oldSize + delta, this);
return oldSize;
}
private void updateLastStreamId(int streamId)
{
Atomics.updateMax(lastStreamId, streamId);
}
public void setWindowSize(int initialWindowSize)
{
flowControl.setWindowSize(this, initialWindowSize);
}
protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
{
try
@ -394,13 +427,30 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
@Override
public String toString()
{
return String.format("%s@%x{queueSize=%d,windowSize=%s,streams=%d}", getClass().getSimpleName(),
hashCode(), flusher.getQueueSize(), windowSize, streams.size());
}
private class Flusher extends IteratingCallback
{
private final Queue<Generator.LeaseCallback> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH);
private Generator.LeaseCallback active;
private final ArrayQueue<FlusherEntry> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH);
private final Set<IStream> stalled = new HashSet<>();
private final List<FlusherEntry> reset = new ArrayList<>();
private final ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool());
private final int maxGather;
private final List<FlusherEntry> active;
private boolean closed;
private void offer(Generator.LeaseCallback lease)
public Flusher(int maxGather)
{
this.maxGather = maxGather;
this.active = new ArrayList<>(maxGather);
}
private void offer(FlusherEntry entry)
{
boolean fail = false;
synchronized (queue)
@ -408,31 +458,108 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (closed)
fail = true;
else
queue.offer(lease);
queue.offer(entry);
}
if (fail)
fail(lease);
closed(entry);
}
private void flush(Generator.LeaseCallback lease)
public int getQueueSize()
{
offer(lease);
synchronized (queue)
{
return queue.size();
}
}
private void flush(FlusherEntry entry)
{
offer(entry);
iterate();
}
@Override
protected Action process() throws Exception
{
Generator.LeaseCallback current = null;
synchronized (queue)
{
if (!closed)
current = active = queue.poll();
if (closed)
return Action.IDLE;
int nonStalledIndex = 0;
int size = queue.size();
while (nonStalledIndex < size)
{
FlusherEntry entry = queue.get(nonStalledIndex);
IStream stream = entry.getStream();
boolean flowControlled = entry.getFrame().getFlowControlledLength() > 0;
if (flowControlled)
{
// Is the session stalled ?
if (getWindowSize() <= 0)
{
LOG.debug("Flow control: session stalled {}", HTTP2Session.this);
++nonStalledIndex;
// There may be *non* flow controlled frames to send.
continue;
}
if (stream != null)
{
// Is it a frame belonging to an already stalled stream ?
if (stalled.contains(stream))
{
++nonStalledIndex;
continue;
}
// Is the stream stalled ?
if (stream.getWindowSize() <= 0)
{
LOG.debug("Flow control: stream stalled {}", stream);
stalled.add(stream);
++nonStalledIndex;
continue;
}
}
}
// We will be possibly writing this frame.
queue.remove(nonStalledIndex);
--size;
// Has the stream been reset ?
if (stream != null && stream.isReset() && flowControlled)
{
reset.add(entry);
continue;
}
active.add(entry);
if (active.size() == maxGather)
break;
}
stalled.clear();
}
if (current == null)
for (int i = 0; i < reset.size(); ++i)
{
FlusherEntry entry = reset.get(i);
// TODO: introduce a StreamResetException ?
entry.failed(new IllegalStateException());
}
reset.clear();
if (active.isEmpty())
return Action.IDLE;
List<ByteBuffer> byteBuffers = current.getByteBuffers();
for (int i = 0; i < active.size(); ++i)
{
FlusherEntry entry = active.get(i);
generator.generate(lease, entry.getFrame());
}
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
endPoint.write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
return Action.SCHEDULED;
}
@ -440,25 +567,38 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void succeeded()
{
active.succeeded();
lease.recycle();
for (int i = 0; i < active.size(); ++i)
{
FlusherEntry entry = active.get(i);
entry.succeeded();
}
active.clear();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
active.failed(x);
lease.recycle();
for (int i = 0; i < active.size(); ++i)
{
FlusherEntry entry = active.get(i);
entry.failed(x);
}
active.clear();
super.failed(x);
}
@Override
protected void completed()
{
throw new IllegalStateException();
}
public void close()
{
Queue<Generator.LeaseCallback> queued;
Queue<FlusherEntry> queued;
synchronized (queue)
{
closed = true;
@ -467,19 +607,55 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
while (true)
{
Generator.LeaseCallback item = queued.poll();
FlusherEntry item = queued.poll();
if (item == null)
break;
fail(item);
closed(item);
}
}
protected void fail(Generator.LeaseCallback item)
protected void closed(FlusherEntry item)
{
item.failed(new ClosedChannelException());
}
}
private class FlusherEntry implements Callback
{
private final IStream stream;
private final Frame frame;
private final Callback callback;
private FlusherEntry(IStream stream, Frame frame, Callback callback)
{
this.stream = stream;
this.frame = frame;
this.callback = callback;
}
public IStream getStream()
{
return stream;
}
public Frame getFrame()
{
return frame;
}
@Override
public void succeeded()
{
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
}
public class PromiseCallback<C> implements Callback
{
private final Promise<C> promise;
@ -503,4 +679,31 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
promise.failed(x);
}
}
private class FlowControlCallback implements Callback
{
private final IStream stream;
private final int length;
private final Callback callback;
private FlowControlCallback(IStream stream, int length, Callback callback)
{
this.stream = stream;
this.length = length;
this.callback = callback;
}
@Override
public void succeeded()
{
flowControl.onDataSent(HTTP2Session.this, stream, -length);
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http2.frames.DataFrame;
@ -35,9 +36,11 @@ public class HTTP2Stream implements IStream
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicInteger windowSize = new AtomicInteger();
private final ISession session;
private final HeadersFrame frame;
private Listener listener;
private volatile boolean reset = false;
public HTTP2Stream(ISession session, HeadersFrame frame)
{
@ -60,13 +63,13 @@ public class HTTP2Stream implements IStream
@Override
public void headers(HeadersFrame frame, Callback callback)
{
session.frame(frame, callback);
session.frame(this, frame, callback);
}
@Override
public void data(DataFrame frame, Callback callback)
{
session.frame(frame, callback);
session.frame(this, frame, callback);
}
@Override
@ -87,6 +90,12 @@ public class HTTP2Stream implements IStream
return attributes().remove(key);
}
@Override
public boolean isReset()
{
return reset;
}
@Override
public boolean isClosed()
{
@ -120,20 +129,27 @@ public class HTTP2Stream implements IStream
}
@Override
public boolean process(Frame frame)
public boolean process(Frame frame, Callback callback)
{
switch (frame.getType())
{
case DATA:
{
return notifyData((DataFrame)frame);
return notifyData((DataFrame)frame, callback);
}
case HEADERS:
{
return false;
}
case RST_STREAM:
{
reset = true;
return false;
}
default:
{
throw new UnsupportedOperationException();
}
}
}
@ -182,27 +198,28 @@ public class HTTP2Stream implements IStream
}
}
protected boolean notifyData(DataFrame frame)
@Override
public int getWindowSize()
{
return windowSize.get();
}
@Override
public int updateWindowSize(int delta)
{
int oldSize = windowSize.getAndAdd(delta);
LOG.debug("Flow control: updated window {} -> {} for {}", oldSize, oldSize + delta, this);
return oldSize;
}
protected boolean notifyData(DataFrame frame, Callback callback)
{
final Listener listener = this.listener;
if (listener == null)
return false;
try
{
listener.onData(this, frame, new Callback()
{
@Override
public void succeeded()
{
// TODO: notify flow control
}
@Override
public void failed(Throwable x)
{
// TODO: bail out
}
});
listener.onData(this, frame, callback);
return false;
}
catch (Throwable x)
@ -215,7 +232,8 @@ public class HTTP2Stream implements IStream
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x{id=%d,windowSize=%s,%s}", getClass().getSimpleName(),
hashCode(), getId(), windowSize, closeState);
}
private enum CloseState

View File

@ -27,5 +27,7 @@ public interface ISession extends Session
@Override
IStream getStream(int streamId);
public void frame(Frame frame, Callback callback);
public void frame(IStream stream, Frame frame, Callback callback);
public int updateWindowSize(int delta);
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.util.Callback;
public interface IStream extends Stream
{
@ -30,7 +31,7 @@ public interface IStream extends Stream
public void setListener(Listener listener);
public boolean process(Frame frame);
public boolean process(Frame frame, Callback callback);
/**
* Updates the close state of this stream.
@ -42,4 +43,8 @@ public interface IStream extends Stream
* that ends the stream).
*/
public void updateClose(boolean update, boolean local);
public int getWindowSize();
public int updateWindowSize(int delta);
}

View File

@ -38,6 +38,8 @@ public interface Stream
public Object removeAttribute(String key);
public boolean isReset();
public boolean isClosed();
// TODO: see SPDY's Stream
@ -63,6 +65,7 @@ public interface Stream
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
}
@Override

View File

@ -24,14 +24,21 @@ public class DataFrame extends Frame
{
private final int streamId;
private final ByteBuffer data;
private boolean endStream;
private final boolean endStream;
private final int length;
public DataFrame(int streamId, ByteBuffer data, boolean endStream)
{
this(streamId, data, endStream, 0);
}
public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding)
{
super(FrameType.DATA);
this.streamId = streamId;
this.data = data;
this.endStream = endStream;
this.length = data.remaining() + padding;
}
public int getStreamId()
@ -48,4 +55,16 @@ public class DataFrame extends Frame
{
return endStream;
}
@Override
public int getFlowControlledLength()
{
return length;
}
@Override
public String toString()
{
return String.format("%s{length:%d/%d}", super.toString(), data.remaining(), length);
}
}

View File

@ -23,7 +23,7 @@ public abstract class Frame
public static final int HEADER_LENGTH = 8;
public static final int MAX_LENGTH = 0x3F_FF;
private FrameType type;
private final FrameType type;
protected Frame(FrameType type)
{
@ -35,6 +35,11 @@ public abstract class Frame
return type;
}
public int getFlowControlledLength()
{
return 0;
}
@Override
public String toString()
{

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class DataGenerator extends FrameGenerator
{
@ -36,7 +35,7 @@ public class DataGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
DataFrame dataFrame = (DataFrame)frame;
generateData(lease, dataFrame.getStreamId(), dataFrame.getData(), dataFrame.isEndStream(), false, null);

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
public abstract class FrameGenerator
{
@ -34,7 +33,7 @@ public abstract class FrameGenerator
this.headerGenerator = headerGenerator;
}
public abstract void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback);
public abstract void generate(ByteBufferPool.Lease lease, Frame frame);
protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId)
{

View File

@ -22,7 +22,6 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.hpack.HpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
public class Generator
{
@ -59,40 +58,18 @@ public class Generator
}
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
public int getHeaderTableSize()
{
return headerTableSize;
}
public LeaseCallback generate(Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
LeaseCallback lease = new LeaseCallback(byteBufferPool, callback);
generators[frame.getType().getType()].generate(lease, frame, callback);
return lease;
}
public static class LeaseCallback extends ByteBufferPool.Lease implements Callback
{
private final Callback callback;
public LeaseCallback(ByteBufferPool byteBufferPool, Callback callback)
{
super(byteBufferPool);
this.callback = callback;
}
@Override
public void succeeded()
{
recycle();
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
recycle();
callback.failed(x);
}
generators[frame.getType().getType()].generate(lease, frame);
}
}

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class GoAwayGenerator extends FrameGenerator
{
@ -36,7 +35,7 @@ public class GoAwayGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
GoAwayFrame goAwayFrame = (GoAwayFrame)frame;
generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload());

View File

@ -28,7 +28,6 @@ import org.eclipse.jetty.http2.hpack.HpackEncoder;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class HeadersGenerator extends FrameGenerator
{
@ -41,7 +40,7 @@ public class HeadersGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
HeadersFrame headersFrame = (HeadersFrame)frame;
generate(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), !headersFrame.isEndStream(), null);

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class PingGenerator extends FrameGenerator
{
@ -36,7 +35,7 @@ public class PingGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
PingFrame pingFrame = (PingFrame)frame;
generatePing(lease, pingFrame.getPayload(), pingFrame.isReply());

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class PriorityGenerator extends FrameGenerator
{
@ -36,7 +35,7 @@ public class PriorityGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
PriorityFrame priorityFrame = (PriorityFrame)frame;
generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive());

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class ResetGenerator extends FrameGenerator
{
@ -36,7 +35,7 @@ public class ResetGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
ResetFrame resetFrame = (ResetFrame)frame;
generateReset(lease, resetFrame.getStreamId(), resetFrame.getError());

View File

@ -27,7 +27,6 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class SettingsGenerator extends FrameGenerator
{
@ -37,7 +36,7 @@ public class SettingsGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply());

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class WindowUpdateGenerator extends FrameGenerator
{
@ -36,7 +35,7 @@ public class WindowUpdateGenerator extends FrameGenerator
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback)
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame;
generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta());
@ -44,15 +43,28 @@ public class WindowUpdateGenerator extends FrameGenerator
public void generateWindowUpdate(ByteBufferPool.Lease lease, int streamId, int windowUpdate)
{
if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId);
if (windowUpdate < 0)
throw new IllegalArgumentException("Invalid window update: " + windowUpdate);
// A negative streamId means that we have to generate
// bytes for both the stream and session frames.
boolean both = false;
if (streamId < 0)
{
both = true;
streamId = -streamId;
}
if (both)
{
ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flag.NONE, 0);
header.putInt(windowUpdate);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
}
ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flag.NONE, streamId);
header.putInt(windowUpdate);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
}

View File

@ -49,7 +49,7 @@ public class DataBodyParser extends BodyParser
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "invalid_data_frame");
return false;
}
return onData(BufferUtil.EMPTY_BUFFER, false);
return onData(BufferUtil.EMPTY_BUFFER, false, 0);
}
@Override
@ -116,7 +116,7 @@ public class DataBodyParser extends BodyParser
if (length == 0)
{
state = State.PADDING;
if (onData(slice, false))
if (onData(slice, false, paddingLength))
{
return Result.ASYNC;
}
@ -125,7 +125,7 @@ public class DataBodyParser extends BodyParser
{
// TODO: check the semantic of Flag.END_SEGMENT.
// We got partial data, simulate a smaller frame, and stay in DATA state.
if (onData(slice, true))
if (onData(slice, true, 0))
{
return Result.ASYNC;
}
@ -153,9 +153,9 @@ public class DataBodyParser extends BodyParser
return Result.PENDING;
}
private boolean onData(ByteBuffer buffer, boolean fragment)
private boolean onData(ByteBuffer buffer, boolean fragment, int padding)
{
DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream());
DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream(), padding);
return notifyData(frame);
}

View File

@ -0,0 +1,79 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{
private int headerTableSize = 4096;
private int initialWindowSize = 65535;
public AbstractHTTP2ServerConnectionFactory()
{
super("h2-12");
}
public int getHeaderTableSize()
{
return headerTableSize;
}
public void setHeaderTableSize(int headerTableSize)
{
this.headerTableSize = headerTableSize;
}
public int getInitialWindowSize()
{
return initialWindowSize;
}
public void setInitialWindowSize(int initialWindowSize)
{
this.initialWindowSize = initialWindowSize;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
Session.Listener listener = newSessionListener(connector, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getHeaderTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener,
new HTTP2FlowControl(getInitialWindowSize()));
Parser parser = new ServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2Connection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, parser, getInputBufferSize());
return configure(connection, connector, endPoint);
}
protected abstract Session.Listener newSessionListener(Connector connector, EndPoint endPoint);
}

View File

@ -18,75 +18,35 @@
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2ServerConnectionFactory extends AbstractConnectionFactory
public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory
{
private static final Logger LOG = Log.getLogger(HTTP2ServerConnectionFactory.class);
private static final String CHANNEL_ATTRIBUTE = HttpChannelOverHTTP2.class.getName();
private final HttpConfiguration httpConfiguration;
private int headerTableSize = 4096;
private int initialWindowSize = 65535;
public HTTP2ServerConnectionFactory(HttpConfiguration httpConfiguration)
{
super("h2-12");
this.httpConfiguration = httpConfiguration;
}
public int getHeaderTableSize()
{
return headerTableSize;
}
public void setHeaderTableSize(int headerTableSize)
{
this.headerTableSize = headerTableSize;
}
public int getInitialWindowSize()
{
return initialWindowSize;
}
public void setInitialWindowSize(int initialWindowSize)
{
this.initialWindowSize = initialWindowSize;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
protected Session.Listener newSessionListener(Connector connector, EndPoint endPoint)
{
Session.Listener listener = new HTTPServerSessionListener(connector, httpConfiguration, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getHeaderTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener, new HTTP2FlowControl(),
getInitialWindowSize());
Parser parser = new ServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2Connection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, parser, getInputBufferSize());
return configure(connection, connector, endPoint);
return new HTTPServerSessionListener(connector, httpConfiguration, endPoint);
}
private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener

View File

@ -29,12 +29,13 @@ import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Listener
{
public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize)
public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
{
super(endPoint, generator, listener, flowControl, initialWindowSize, 2);
super(endPoint, generator, listener, flowControl, 2);
}
@Override
@ -43,7 +44,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
// SPEC: send a SETTINGS frame upon receiving the preface.
HashMap<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.HEADER_TABLE_SIZE, getGenerator().getHeaderTableSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialWindowSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getFlowControl().getInitialWindowSize());
int maxConcurrentStreams = getMaxStreamCount();
if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
@ -59,7 +60,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
stream.process(frame);
stream.process(frame, Callback.Adapter.INSTANCE);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
// The listener may have sent a frame that closed the stream.

View File

@ -0,0 +1,39 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory
{
private final Session.Listener listener;
public RawHTTP2ServerConnectionFactory(Session.Listener listener)
{
this.listener = listener;
}
@Override
protected Session.Listener newSessionListener(Connector connector, EndPoint endPoint)
{
return listener;
}
}

View File

@ -56,7 +56,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -105,7 +104,8 @@ public class HTTP2ServerTest
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, HttpMethod.GET.asString(),
host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
Generator.LeaseCallback lease = generator.generate(request, Callback.Adapter.INSTANCE);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request);
// No preface bytes
@ -153,7 +153,8 @@ public class HTTP2ServerTest
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, HttpMethod.GET.asString(),
host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
Generator.LeaseCallback lease = generator.generate(request, Callback.Adapter.INSTANCE);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port))
@ -215,7 +216,8 @@ public class HTTP2ServerTest
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, HttpMethod.GET.asString(),
host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
Generator.LeaseCallback lease = generator.generate(request, Callback.Adapter.INSTANCE);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port))