diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 626ce7998ff..b5853973e22 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -51,8 +51,6 @@ import org.eclipse.jetty.util.thread.Scheduler; public class HTTP2Client extends ContainerLifeCycle { private final Queue sessions = new ConcurrentLinkedQueue<>(); - private final Executor executor; - private final Scheduler scheduler; private final SelectorManager selector; private final ByteBufferPool byteBufferPool; @@ -63,9 +61,8 @@ public class HTTP2Client extends ContainerLifeCycle public HTTP2Client(Executor executor) { - this.executor = executor; addBean(executor); - this.scheduler = new ScheduledExecutorScheduler(); + Scheduler scheduler = new ScheduledExecutorScheduler(); addBean(scheduler, true); this.selector = new ClientSelectorManager(executor, scheduler); addBean(selector, true); @@ -121,7 +118,7 @@ public class HTTP2Client extends ContainerLifeCycle { Context context = (Context)attachment; Generator generator = new Generator(byteBufferPool, 4096); - HTTP2Session session = new HTTP2ClientSession(endpoint, generator, context.listener, new HTTP2FlowControl(), 65535); + HTTP2Session session = new HTTP2ClientSession(endpoint, generator, context.listener, new HTTP2FlowControl(65535)); Parser parser = new Parser(byteBufferPool, session); Connection connection = new HTTP2ClientConnection(byteBufferPool, getExecutor(), endpoint, parser, 8192, session); context.promise.succeeded(session); diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index a4a00885100..7625dd748c8 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.parser.ErrorCode; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -34,9 +35,9 @@ public class HTTP2ClientSession extends HTTP2Session { private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class); - public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize) + public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl) { - super(endPoint, generator, listener, flowControl, initialWindowSize, 1); + super(endPoint, generator, listener, flowControl, 1); } @Override @@ -52,7 +53,7 @@ public class HTTP2ClientSession extends HTTP2Session else { stream.updateClose(frame.isEndStream(), false); - stream.process(frame); + stream.process(frame, Callback.Adapter.INSTANCE); notifyHeaders(stream, frame); if (stream.isClosed()) removeStream(stream, false); diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java new file mode 100644 index 00000000000..8faf26193c7 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java @@ -0,0 +1,106 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServlet; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.hpack.MetaData; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.After; + +public class AbstractTest +{ + private ServerConnector connector; + private String path = "/test"; + private HTTP2Client client; + private Server server; + + protected void startServer(HttpServlet servlet) throws Exception + { + prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration())); + + ServletContextHandler context = new ServletContextHandler(server, "/"); + context.addServlet(new ServletHolder(servlet), path); + + prepareClient(); + server.start(); + } + + protected void startServer(Session.Listener listener) throws Exception + { + prepareServer(new RawHTTP2ServerConnectionFactory(listener)); + prepareClient(); + server.start(); + } + + private void prepareServer(ConnectionFactory connectionFactory) + { + QueuedThreadPool serverExecutor = new QueuedThreadPool(); + serverExecutor.setName("server"); + server = new Server(serverExecutor); + connector = new ServerConnector(server, connectionFactory); + server.addConnector(connector); + } + + private void prepareClient() + { + QueuedThreadPool clientExecutor = new QueuedThreadPool(); + clientExecutor.setName("client"); + client = new HTTP2Client(clientExecutor); + server.addBean(client); + } + + protected Session newClient(Session.Listener listener) throws Exception + { + String host = "localhost"; + int port = connector.getLocalPort(); + InetSocketAddress address = new InetSocketAddress(host, port); + FuturePromise promise = new FuturePromise<>(); + client.connect(address, listener, promise); + return promise.get(5, TimeUnit.SECONDS); + } + + @After + public void dispose() throws Exception + { + server.stop(); + } + + protected MetaData.Request newRequest(String method, HttpFields fields) + { + String host = "localhost"; + int port = connector.getLocalPort(); + String authority = host + ":" + port; + return new MetaData.Request(HttpScheme.HTTP, method, authority, host, port, path, fields); + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/EmptyHttpServlet.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/EmptyHttpServlet.java new file mode 100644 index 00000000000..588994c04a1 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/EmptyHttpServlet.java @@ -0,0 +1,33 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class EmptyHttpServlet extends HttpServlet +{ + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java new file mode 100644 index 00000000000..327cdde1c89 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java @@ -0,0 +1,495 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.hpack.MetaData; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.junit.Assert; +import org.junit.Test; + +public class FlowControlTest extends AbstractTest +{ + @Test + public void testFlowControlWithConcurrentSettings() throws Exception + { + // Initial window is 64 KiB. We allow the client to send 1024 B + // then we change the window to 512 B. At this point, the client + // must stop sending data (although the initial window allows it). + + final int size = 512; + final CountDownLatch dataLatch = new CountDownLatch(1); + final AtomicReference callbackRef = new AtomicReference<>(); + startServer(new Session.Listener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame) + { + HttpFields fields = new HttpFields(); + MetaData.Response response = new MetaData.Response(200, fields); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); + stream.headers(responseFrame, Callback.Adapter.INSTANCE); + + return new Stream.Listener.Adapter() + { + private final AtomicInteger dataFrames = new AtomicInteger(); + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + int dataFrameCount = dataFrames.incrementAndGet(); + if (dataFrameCount == 1) + { + callbackRef.set(callback); + Map settings = new HashMap<>(); + settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, size); + stream.getSession().settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE); + // Do not succeed the callback here. + } + else if (dataFrameCount > 1) + { + // Consume the data. + callback.succeeded(); + dataLatch.countDown(); + } + } + }; + } + }); + + // Two SETTINGS frames, the initial one and the one we send. + final CountDownLatch settingsLatch = new CountDownLatch(2); + Session client = newClient(new Session.Listener.Adapter() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + }); + + MetaData.Request request = newRequest("POST", new HttpFields()); + FuturePromise promise = new FuturePromise<>(); + client.newStream(new HeadersFrame(0, request, null, false), promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + + // Send first chunk that exceeds the window. + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE); + settingsLatch.await(5, TimeUnit.SECONDS); + + // Send the second chunk of data, must not arrive since we're flow control stalled on the client. + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE); + Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); + + // Consume the data arrived to server, this will resume flow control on the client. + callbackRef.get().succeeded(); + + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + +/* + @Test + public void testServerFlowControlOneBigWrite() throws Exception + { + final int windowSize = 1536; + final int length = 5 * windowSize; + final CountDownLatch settingsLatch = new CountDownLatch(1); + Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter() + { + @Override + public void onSettings(Session session, SettingsInfo settingsInfo) + { + settingsLatch.countDown(); + } + + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false), new Callback.Adapter()); + stream.data(new BytesDataInfo(new byte[length], true), new Callback.Adapter()); + return null; + } + }), null); + + Settings settings = new Settings(); + settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize)); + session.settings(new SettingsInfo(settings)); + + Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + final Exchanger exchanger = new Exchanger<>(); + session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() + { + private AtomicInteger dataFrames = new AtomicInteger(); + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + try + { + int dataFrames = this.dataFrames.incrementAndGet(); + if (dataFrames == 1) + { + // Do not consume nor read from the data frame. + // We should then be flow-control stalled + exchanger.exchange(dataInfo); + } + else if (dataFrames == 2) + { + // Read but not consume, we should be flow-control stalled + dataInfo.asByteBuffer(false); + exchanger.exchange(dataInfo); + } + else if (dataFrames == 3) + { + // Consume partially, we should be flow-control stalled + dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2)); + exchanger.exchange(dataInfo); + } + else if (dataFrames == 4 || dataFrames == 5) + { + // Consume totally + dataInfo.asByteBuffer(true); + exchanger.exchange(dataInfo); + } + else + { + Assert.fail(); + } + } + catch (InterruptedException x) + { + throw new SPDYException(x); + } + } + }); + + DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + checkThatWeAreFlowControlStalled(exchanger); + + Assert.assertEquals(windowSize, dataInfo.available()); + Assert.assertEquals(0, dataInfo.consumed()); + dataInfo.asByteBuffer(true); + + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + checkThatWeAreFlowControlStalled(exchanger); + + Assert.assertEquals(0, dataInfo.available()); + Assert.assertEquals(0, dataInfo.consumed()); + dataInfo.consume(dataInfo.length()); + + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + checkThatWeAreFlowControlStalled(exchanger); + + Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed()); + dataInfo.asByteBuffer(true); + + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + Assert.assertEquals(dataInfo.length(), dataInfo.consumed()); + // Check that we are not flow control stalled + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + Assert.assertEquals(dataInfo.length(), dataInfo.consumed()); + } + + @Test + public void testClientFlowControlOneBigWrite() throws Exception + { + final int windowSize = 1536; + final Exchanger exchanger = new Exchanger<>(); + final CountDownLatch settingsLatch = new CountDownLatch(1); + Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter() + { + @Override + public void onConnect(Session session) + { + Settings settings = new Settings(); + settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize)); + session.settings(new SettingsInfo(settings), new FutureCallback()); + } + + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false), new Callback.Adapter()); + return new StreamFrameListener.Adapter() + { + private AtomicInteger dataFrames = new AtomicInteger(); + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + try + { + int dataFrames = this.dataFrames.incrementAndGet(); + if (dataFrames == 1) + { + // Do not consume nor read from the data frame. + // We should then be flow-control stalled + exchanger.exchange(dataInfo); + } + else if (dataFrames == 2) + { + // Read but not consume, we should be flow-control stalled + dataInfo.asByteBuffer(false); + exchanger.exchange(dataInfo); + } + else if (dataFrames == 3) + { + // Consume partially, we should be flow-control stalled + dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2)); + exchanger.exchange(dataInfo); + } + else if (dataFrames == 4 || dataFrames == 5) + { + // Consume totally + dataInfo.asByteBuffer(true); + exchanger.exchange(dataInfo); + } + else + { + Assert.fail(); + } + } + catch (InterruptedException x) + { + throw new SPDYException(x); + } + } + }; + } + }), new SessionFrameListener.Adapter() + { + @Override + public void onSettings(Session session, SettingsInfo settingsInfo) + { + settingsLatch.countDown(); + } + }); + + Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), null); + final int length = 5 * windowSize; + stream.data(new BytesDataInfo(new byte[length], true), new Callback.Adapter()); + + DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + checkThatWeAreFlowControlStalled(exchanger); + + Assert.assertEquals(windowSize, dataInfo.available()); + Assert.assertEquals(0, dataInfo.consumed()); + dataInfo.asByteBuffer(true); + + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + checkThatWeAreFlowControlStalled(exchanger); + + Assert.assertEquals(0, dataInfo.available()); + Assert.assertEquals(0, dataInfo.consumed()); + dataInfo.consume(dataInfo.length()); + + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + checkThatWeAreFlowControlStalled(exchanger); + + Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed()); + dataInfo.asByteBuffer(true); + + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + Assert.assertEquals(dataInfo.length(), dataInfo.consumed()); + // Check that we are not flow control stalled + dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); + Assert.assertEquals(dataInfo.length(), dataInfo.consumed()); + } + + @Test + public void testStreamsStalledDoesNotStallOtherStreams() throws Exception + { + final int windowSize = 1024; + final CountDownLatch settingsLatch = new CountDownLatch(1); + Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter() + { + @Override + public void onSettings(Session session, SettingsInfo settingsInfo) + { + settingsLatch.countDown(); + } + + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false), new Callback.Adapter()); + stream.data(new BytesDataInfo(new byte[windowSize * 2], true), new Callback.Adapter()); + return null; + } + }), null); + Settings settings = new Settings(); + settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize)); + session.settings(new SettingsInfo(settings)); + + Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + final CountDownLatch latch = new CountDownLatch(3); + final AtomicReference dataInfoRef1 = new AtomicReference<>(); + final AtomicReference dataInfoRef2 = new AtomicReference<>(); + session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), new StreamFrameListener.Adapter() + { + private final AtomicInteger dataFrames = new AtomicInteger(); + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + int frames = dataFrames.incrementAndGet(); + if (frames == 1) + { + // Do not consume it to stall flow control + dataInfoRef1.set(dataInfo); + } + else + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + latch.countDown(); + } + } + }); + session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), new StreamFrameListener.Adapter() + { + private final AtomicInteger dataFrames = new AtomicInteger(); + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + int frames = dataFrames.incrementAndGet(); + if (frames == 1) + { + // Do not consume it to stall flow control + dataInfoRef2.set(dataInfo); + } + else + { + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + latch.countDown(); + } + } + }); + session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), true, (byte)0), new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + DataInfo dataInfo1 = dataInfoRef1.getAndSet(null); + if (dataInfo1 != null) + dataInfo1.consume(dataInfo1.length()); + DataInfo dataInfo2 = dataInfoRef2.getAndSet(null); + if (dataInfo2 != null) + dataInfo2.consume(dataInfo2.length()); + dataInfo.consume(dataInfo.length()); + if (dataInfo.isClose()) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testSendBigFileWithoutFlowControl() throws Exception + { + testSendBigFile(SPDY.V2); + } + + @Test + public void testSendBigFileWithFlowControl() throws Exception + { + testSendBigFile(SPDY.V3); + } + + private void testSendBigFile(short version) throws Exception + { + final int dataSize = 1024 * 1024; + final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false); + final CountDownLatch allDataReceivedLatch = new CountDownLatch(1); + + Session session = startClient(version, startServer(version, new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false), new Callback.Adapter()); + stream.data(bigByteBufferDataInfo, new Callback.Adapter()); + return null; + } + }),new SessionFrameListener.Adapter()); + + session.syn(new SynInfo(new Fields(), false),new StreamFrameListener.Adapter() + { + private int dataBytesReceived; + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataBytesReceived = dataBytesReceived + dataInfo.length(); + dataInfo.consume(dataInfo.length()); + if (dataBytesReceived == dataSize) + allDataReceivedLatch.countDown(); + } + }); + + assertThat("all data bytes have been received by the client", allDataReceivedLatch.await(5, TimeUnit.SECONDS), is(true)); + } + + private void checkThatWeAreFlowControlStalled(final Exchanger exchanger) + { + expectException(TimeoutException.class, new Callable() + { + @Override + public DataInfo call() throws Exception + { + return exchanger.exchange(null, 1, TimeUnit.SECONDS); + } + }); + } + + private void expectException(Class exception, Callable command) + { + try + { + command.call(); + Assert.fail(); + } + catch (Exception x) + { + Assert.assertSame(exception, x.getClass()); + } + } +*/ +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java index 3af73779ed0..4e7e91d5254 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.http2.client; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; @@ -30,83 +29,30 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; -import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.hpack.MetaData; -import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; -import org.eclipse.jetty.server.HttpConfiguration; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.junit.After; import org.junit.Assert; import org.junit.Test; -public class HTTP2Test +public class HTTP2Test extends AbstractTest { - private Server server; - private ServerConnector connector; - private String path; - private HTTP2Client client; - - private void startServer(HttpServlet servlet) throws Exception - { - QueuedThreadPool serverExecutor = new QueuedThreadPool(); - serverExecutor.setName("server"); - server = new Server(serverExecutor); - connector = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration())); - server.addConnector(connector); - - ServletContextHandler context = new ServletContextHandler(server, "/"); - path = "/test"; - context.addServlet(new ServletHolder(servlet), path); - - QueuedThreadPool clientExecutor = new QueuedThreadPool(); - clientExecutor.setName("client"); - client = new HTTP2Client(clientExecutor); - server.addBean(client); - - server.start(); - } - - @After - public void dispose() throws Exception - { - server.stop(); - } - @Test public void testRequestNoContentResponseNoContent() throws Exception { - startServer(new HttpServlet() - { - @Override - protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException - { - } - }); + startServer(new EmptyHttpServlet()); - String host = "localhost"; - int port = connector.getLocalPort(); - String authority = host + ":" + port; - InetSocketAddress address = new InetSocketAddress(host, port); - FuturePromise promise = new FuturePromise<>(); - client.connect(address, new Session.Listener.Adapter(), promise); - Session session = promise.get(); + Session client = newClient(new Session.Listener.Adapter()); HttpFields fields = new HttpFields(); - MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, "GET", authority, host, port, path, fields); + MetaData.Request metaData = newRequest("GET", fields); HeadersFrame frame = new HeadersFrame(1, metaData, null, true); final CountDownLatch latch = new CountDownLatch(1); - session.newStream(frame, new Promise.Adapter(), new Stream.Listener.Adapter() + client.newStream(frame, new Promise.Adapter(), new Stream.Listener.Adapter() { @Override public void onHeaders(Stream stream, HeadersFrame frame) @@ -140,19 +86,13 @@ public class HTTP2Test } }); - String host = "localhost"; - int port = connector.getLocalPort(); - String authority = host + ":" + port; - InetSocketAddress address = new InetSocketAddress(host, port); - FuturePromise promise = new FuturePromise<>(); - client.connect(address, new Session.Listener.Adapter(), promise); - Session session = promise.get(); + Session client = newClient(new Session.Listener.Adapter()); HttpFields fields = new HttpFields(); - MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, "GET", authority, host, port, path, fields); + MetaData.Request metaData = newRequest("GET", fields); HeadersFrame frame = new HeadersFrame(1, metaData, null, true); final CountDownLatch latch = new CountDownLatch(2); - session.newStream(frame, new Promise.Adapter(), new Stream.Listener.Adapter() + client.newStream(frame, new Promise.Adapter(), new Stream.Listener.Adapter() { @Override public void onHeaders(Stream stream, HeadersFrame frame) @@ -177,6 +117,7 @@ public class HTTP2Test Assert.assertTrue(frame.isEndStream()); Assert.assertEquals(ByteBuffer.wrap(content), frame.getData()); + callback.succeeded(); latch.countDown(); } }); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControl.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControl.java index c5b029ac9b6..44b29c358b0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControl.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControl.java @@ -18,11 +18,21 @@ package org.eclipse.jetty.http2; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; + public interface FlowControl { public void onNewStream(IStream stream); - public int getWindowSize(ISession session); + public int getInitialWindowSize(); - public void setWindowSize(ISession session, int windowSize); + public void updateInitialWindowSize(ISession session, int initialWindowSize); + + public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame); + + public void onDataReceived(ISession session, IStream stream, int length); + + public void onDataConsumed(ISession session, IStream stream, int length); + + public void onDataSent(ISession session, IStream stream, int length); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2FlowControl.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2FlowControl.java index 7845025038e..492c0679b2d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2FlowControl.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2FlowControl.java @@ -18,21 +18,84 @@ package org.eclipse.jetty.http2; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; +import org.eclipse.jetty.util.Callback; + public class HTTP2FlowControl implements FlowControl { + private volatile int initialWindowSize; + + public HTTP2FlowControl(int initialWindowSize) + { + this.initialWindowSize = initialWindowSize; + } + @Override public void onNewStream(IStream stream) { + stream.updateWindowSize(initialWindowSize); } @Override - public int getWindowSize(ISession session) + public int getInitialWindowSize() { - return 0; + return initialWindowSize; } @Override - public void setWindowSize(ISession session, int windowSize) + public void updateInitialWindowSize(ISession session, int initialWindowSize) { + int windowSize = this.initialWindowSize; + this.initialWindowSize = initialWindowSize; + + int delta = initialWindowSize - windowSize; + + // Update the sessions's window size. + session.updateWindowSize(delta); + + // Update the streams' window size. + for (Stream stream : session.getStreams()) + ((IStream)stream).updateWindowSize(delta); + } + + @Override + public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame) + { + if (frame.getStreamId() > 0) + { + if (stream != null) + stream.updateWindowSize(frame.getWindowDelta()); + } + else + { + session.updateWindowSize(frame.getWindowDelta()); + } + } + + @Override + public void onDataReceived(ISession session, IStream stream, int length) + { + } + + @Override + public void onDataConsumed(ISession session, IStream stream, int length) + { + // This is the algorithm for flow control. + // This method is called when a whole flow controlled frame has been consumed. + // We currently send a WindowUpdate every time, even if the frame was very small. + // Other policies may send the WindowUpdate only upon reaching a threshold. + + // Negative streamId allow for generation of bytes for both stream and session + int streamId = stream != null ? -stream.getId() : 0; + WindowUpdateFrame frame = new WindowUpdateFrame(streamId, length); + session.frame(stream, frame, Callback.Adapter.INSTANCE); + } + + @Override + public void onDataSent(ISession session, IStream stream, int length) + { + stream.getSession().updateWindowSize(length); + stream.updateWindowSize(length); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index d3856e8c095..7295cfd18f5 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -24,9 +24,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -45,6 +47,7 @@ import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.parser.ErrorCode; import org.eclipse.jetty.http2.parser.Parser; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.Atomics; @@ -72,23 +75,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener private final AtomicInteger streamIds = new AtomicInteger(); private final AtomicInteger lastStreamId = new AtomicInteger(); private final AtomicInteger streamCount = new AtomicInteger(); - private final Flusher flusher = new Flusher(); + private final AtomicInteger windowSize = new AtomicInteger(); private final EndPoint endPoint; private final Generator generator; private final Listener listener; private final FlowControl flowControl; - private final int initialWindowSize; + private final Flusher flusher; private volatile int maxStreamCount; - public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize, int initialStreamId) + public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialStreamId) { this.endPoint = endPoint; this.generator = generator; this.listener = listener; this.flowControl = flowControl; - this.initialWindowSize = initialWindowSize; + this.flusher = new Flusher(4); this.maxStreamCount = -1; this.streamIds.set(initialStreamId); + this.windowSize.set(flowControl.getInitialWindowSize()); } public Generator getGenerator() @@ -96,11 +100,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener return generator; } - public int getInitialWindowSize() - { - return initialWindowSize; - } - public int getMaxStreamCount() { return maxStreamCount; @@ -112,14 +111,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } @Override - public boolean onData(DataFrame frame) + public boolean onData(final DataFrame frame) { int streamId = frame.getStreamId(); - IStream stream = getStream(streamId); + final IStream stream = getStream(streamId); if (stream != null) { stream.updateClose(frame.isEndStream(), false); - return stream.process(frame); + flowControl.onDataReceived(this, stream, frame.getFlowControlledLength()); + return stream.process(frame, new Callback.Adapter() + { + @Override + public void succeeded() + { + int consumed = frame.getFlowControlledLength(); + LOG.debug("Flow control: {} consumed on {}", consumed, stream); + flowControl.onDataConsumed(HTTP2Session.this, stream, consumed); + } + }); } else { @@ -156,8 +165,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE)) { int windowSize = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE); - setWindowSize(windowSize); - LOG.debug("Updated window size to {}", windowSize); + flowControl.updateInitialWindowSize(this, windowSize); + LOG.debug("Updated initial window size to {}", windowSize); } // TODO: handle other settings notifySettings(this, frame); @@ -202,6 +211,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener @Override public boolean onWindowUpdate(WindowUpdateFrame frame) { + int streamId = frame.getStreamId(); + IStream stream = null; + if (streamId > 0) + stream = getStream(streamId); + flowControl.onWindowUpdate(this, stream, frame); + + // Flush stalled data. + flusher.iterate(); return false; } @@ -231,7 +248,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } stream.updateClose(frame.isEndStream(), true); stream.setListener(listener); - flusher.offer(generator.generate(frame, new PromiseCallback<>(promise, stream))); + + FlusherEntry entry = new FlusherEntry(stream, frame, new PromiseCallback<>(promise, stream)); + flusher.offer(entry); } // Iterate outside the synchronized block. flusher.iterate(); @@ -240,19 +259,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener @Override public void settings(SettingsFrame frame, Callback callback) { - frame(frame, callback); + frame(null, frame, callback); } @Override public void ping(PingFrame frame, Callback callback) { - frame(frame, callback); + frame(null, frame, callback); } @Override public void reset(ResetFrame frame, Callback callback) { - frame(frame, callback); + frame(null, frame, callback); } @Override @@ -261,14 +280,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8); GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload); LOG.debug("Sending {}: {}", frame.getType(), reason); - frame(frame, callback); + frame(null, frame, callback); } @Override - public void frame(Frame frame, Callback callback) + public void frame(IStream stream, Frame frame, Callback callback) { - Generator.LeaseCallback lease = generator.generate(frame, callback); - flusher.flush(lease); + int flowControlledLength = frame.getFlowControlledLength(); + if (flowControlledLength > 0) + callback = new FlowControlCallback(stream, flowControlledLength, callback); + // We want to generate as late as possible to allow re-prioritization. + FlusherEntry entry = new FlusherEntry(stream, frame, callback); + LOG.debug("Sending {}", frame); + flusher.flush(entry); } protected void disconnect() @@ -281,9 +305,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener { IStream stream = newStream(frame); int streamId = stream.getId(); - updateLastStreamId(streamId); if (streams.putIfAbsent(streamId, stream) == null) { + flowControl.onNewStream(stream); LOG.debug("Created local {}", stream); return stream; } @@ -317,6 +341,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener if (streams.putIfAbsent(streamId, stream) == null) { updateLastStreamId(streamId); + flowControl.onNewStream(stream); LOG.debug("Created remote {}", stream); return stream; } @@ -359,16 +384,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener return streams.get(streamId); } + protected int getWindowSize() + { + return windowSize.get(); + } + + @Override + public int updateWindowSize(int delta) + { + int oldSize = windowSize.getAndAdd(delta); + LOG.debug("Flow control: updated window {} -> {} for {}", oldSize, oldSize + delta, this); + return oldSize; + } + private void updateLastStreamId(int streamId) { Atomics.updateMax(lastStreamId, streamId); } - public void setWindowSize(int initialWindowSize) - { - flowControl.setWindowSize(this, initialWindowSize); - } - protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame) { try @@ -394,13 +427,30 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } } + @Override + public String toString() + { + return String.format("%s@%x{queueSize=%d,windowSize=%s,streams=%d}", getClass().getSimpleName(), + hashCode(), flusher.getQueueSize(), windowSize, streams.size()); + } + private class Flusher extends IteratingCallback { - private final Queue queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH); - private Generator.LeaseCallback active; + private final ArrayQueue queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH); + private final Set stalled = new HashSet<>(); + private final List reset = new ArrayList<>(); + private final ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool()); + private final int maxGather; + private final List active; private boolean closed; - private void offer(Generator.LeaseCallback lease) + public Flusher(int maxGather) + { + this.maxGather = maxGather; + this.active = new ArrayList<>(maxGather); + } + + private void offer(FlusherEntry entry) { boolean fail = false; synchronized (queue) @@ -408,31 +458,108 @@ public abstract class HTTP2Session implements ISession, Parser.Listener if (closed) fail = true; else - queue.offer(lease); + queue.offer(entry); } if (fail) - fail(lease); + closed(entry); } - private void flush(Generator.LeaseCallback lease) + public int getQueueSize() { - offer(lease); + synchronized (queue) + { + return queue.size(); + } + } + + private void flush(FlusherEntry entry) + { + offer(entry); iterate(); } @Override protected Action process() throws Exception { - Generator.LeaseCallback current = null; synchronized (queue) { - if (!closed) - current = active = queue.poll(); + if (closed) + return Action.IDLE; + + int nonStalledIndex = 0; + int size = queue.size(); + while (nonStalledIndex < size) + { + FlusherEntry entry = queue.get(nonStalledIndex); + IStream stream = entry.getStream(); + boolean flowControlled = entry.getFrame().getFlowControlledLength() > 0; + if (flowControlled) + { + // Is the session stalled ? + if (getWindowSize() <= 0) + { + LOG.debug("Flow control: session stalled {}", HTTP2Session.this); + ++nonStalledIndex; + // There may be *non* flow controlled frames to send. + continue; + } + + if (stream != null) + { + // Is it a frame belonging to an already stalled stream ? + if (stalled.contains(stream)) + { + ++nonStalledIndex; + continue; + } + + // Is the stream stalled ? + if (stream.getWindowSize() <= 0) + { + LOG.debug("Flow control: stream stalled {}", stream); + stalled.add(stream); + ++nonStalledIndex; + continue; + } + } + } + + // We will be possibly writing this frame. + queue.remove(nonStalledIndex); + --size; + + // Has the stream been reset ? + if (stream != null && stream.isReset() && flowControlled) + { + reset.add(entry); + continue; + } + + active.add(entry); + if (active.size() == maxGather) + break; + } + stalled.clear(); } - if (current == null) + + for (int i = 0; i < reset.size(); ++i) + { + FlusherEntry entry = reset.get(i); + // TODO: introduce a StreamResetException ? + entry.failed(new IllegalStateException()); + } + reset.clear(); + + if (active.isEmpty()) return Action.IDLE; - List byteBuffers = current.getByteBuffers(); + for (int i = 0; i < active.size(); ++i) + { + FlusherEntry entry = active.get(i); + generator.generate(lease, entry.getFrame()); + } + + List byteBuffers = lease.getByteBuffers(); endPoint.write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()])); return Action.SCHEDULED; } @@ -440,25 +567,38 @@ public abstract class HTTP2Session implements ISession, Parser.Listener @Override public void succeeded() { - active.succeeded(); + lease.recycle(); + for (int i = 0; i < active.size(); ++i) + { + FlusherEntry entry = active.get(i); + entry.succeeded(); + } + active.clear(); super.succeeded(); } @Override public void failed(Throwable x) { - active.failed(x); + lease.recycle(); + for (int i = 0; i < active.size(); ++i) + { + FlusherEntry entry = active.get(i); + entry.failed(x); + } + active.clear(); super.failed(x); } @Override protected void completed() { + throw new IllegalStateException(); } public void close() { - Queue queued; + Queue queued; synchronized (queue) { closed = true; @@ -467,19 +607,55 @@ public abstract class HTTP2Session implements ISession, Parser.Listener while (true) { - Generator.LeaseCallback item = queued.poll(); + FlusherEntry item = queued.poll(); if (item == null) break; - fail(item); + closed(item); } } - protected void fail(Generator.LeaseCallback item) + protected void closed(FlusherEntry item) { item.failed(new ClosedChannelException()); } } + private class FlusherEntry implements Callback + { + private final IStream stream; + private final Frame frame; + private final Callback callback; + + private FlusherEntry(IStream stream, Frame frame, Callback callback) + { + this.stream = stream; + this.frame = frame; + this.callback = callback; + } + + public IStream getStream() + { + return stream; + } + + public Frame getFrame() + { + return frame; + } + + @Override + public void succeeded() + { + callback.succeeded(); + } + + @Override + public void failed(Throwable x) + { + callback.failed(x); + } + } + public class PromiseCallback implements Callback { private final Promise promise; @@ -503,4 +679,31 @@ public abstract class HTTP2Session implements ISession, Parser.Listener promise.failed(x); } } + + private class FlowControlCallback implements Callback + { + private final IStream stream; + private final int length; + private final Callback callback; + + private FlowControlCallback(IStream stream, int length, Callback callback) + { + this.stream = stream; + this.length = length; + this.callback = callback; + } + + @Override + public void succeeded() + { + flowControl.onDataSent(HTTP2Session.this, stream, -length); + callback.succeeded(); + } + + @Override + public void failed(Throwable x) + { + callback.failed(x); + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 122eaea4623..a0a8e79752f 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.http2; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http2.frames.DataFrame; @@ -35,9 +36,11 @@ public class HTTP2Stream implements IStream private final AtomicReference> attributes = new AtomicReference<>(); private final AtomicReference closeState = new AtomicReference<>(CloseState.NOT_CLOSED); + private final AtomicInteger windowSize = new AtomicInteger(); private final ISession session; private final HeadersFrame frame; private Listener listener; + private volatile boolean reset = false; public HTTP2Stream(ISession session, HeadersFrame frame) { @@ -60,13 +63,13 @@ public class HTTP2Stream implements IStream @Override public void headers(HeadersFrame frame, Callback callback) { - session.frame(frame, callback); + session.frame(this, frame, callback); } @Override public void data(DataFrame frame, Callback callback) { - session.frame(frame, callback); + session.frame(this, frame, callback); } @Override @@ -87,6 +90,12 @@ public class HTTP2Stream implements IStream return attributes().remove(key); } + @Override + public boolean isReset() + { + return reset; + } + @Override public boolean isClosed() { @@ -120,20 +129,27 @@ public class HTTP2Stream implements IStream } @Override - public boolean process(Frame frame) + public boolean process(Frame frame, Callback callback) { switch (frame.getType()) { case DATA: { - return notifyData((DataFrame)frame); + return notifyData((DataFrame)frame, callback); } case HEADERS: { return false; } + case RST_STREAM: + { + reset = true; + return false; + } default: + { throw new UnsupportedOperationException(); + } } } @@ -182,27 +198,28 @@ public class HTTP2Stream implements IStream } } - protected boolean notifyData(DataFrame frame) + @Override + public int getWindowSize() + { + return windowSize.get(); + } + + @Override + public int updateWindowSize(int delta) + { + int oldSize = windowSize.getAndAdd(delta); + LOG.debug("Flow control: updated window {} -> {} for {}", oldSize, oldSize + delta, this); + return oldSize; + } + + protected boolean notifyData(DataFrame frame, Callback callback) { final Listener listener = this.listener; if (listener == null) return false; try { - listener.onData(this, frame, new Callback() - { - @Override - public void succeeded() - { - // TODO: notify flow control - } - - @Override - public void failed(Throwable x) - { - // TODO: bail out - } - }); + listener.onData(this, frame, callback); return false; } catch (Throwable x) @@ -215,7 +232,8 @@ public class HTTP2Stream implements IStream @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x{id=%d,windowSize=%s,%s}", getClass().getSimpleName(), + hashCode(), getId(), windowSize, closeState); } private enum CloseState diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java index e3c4a11c9b0..916dddc6cca 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java @@ -27,5 +27,7 @@ public interface ISession extends Session @Override IStream getStream(int streamId); - public void frame(Frame frame, Callback callback); + public void frame(IStream stream, Frame frame, Callback callback); + + public int updateWindowSize(int delta); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index d7cc0222965..5bd23056bb6 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.http2; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.util.Callback; public interface IStream extends Stream { @@ -30,7 +31,7 @@ public interface IStream extends Stream public void setListener(Listener listener); - public boolean process(Frame frame); + public boolean process(Frame frame, Callback callback); /** * Updates the close state of this stream. @@ -42,4 +43,8 @@ public interface IStream extends Stream * that ends the stream). */ public void updateClose(boolean update, boolean local); + + public int getWindowSize(); + + public int updateWindowSize(int delta); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index 9afc09570b7..5e72e05464a 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -38,6 +38,8 @@ public interface Stream public Object removeAttribute(String key); + public boolean isReset(); + public boolean isClosed(); // TODO: see SPDY's Stream @@ -63,6 +65,7 @@ public interface Stream @Override public void onData(Stream stream, DataFrame frame, Callback callback) { + callback.succeeded(); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java index e9e84cdc3dd..3bc7ac73814 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java @@ -24,14 +24,21 @@ public class DataFrame extends Frame { private final int streamId; private final ByteBuffer data; - private boolean endStream; + private final boolean endStream; + private final int length; public DataFrame(int streamId, ByteBuffer data, boolean endStream) + { + this(streamId, data, endStream, 0); + } + + public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding) { super(FrameType.DATA); this.streamId = streamId; this.data = data; this.endStream = endStream; + this.length = data.remaining() + padding; } public int getStreamId() @@ -48,4 +55,16 @@ public class DataFrame extends Frame { return endStream; } + + @Override + public int getFlowControlledLength() + { + return length; + } + + @Override + public String toString() + { + return String.format("%s{length:%d/%d}", super.toString(), data.remaining(), length); + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/Frame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/Frame.java index 782b6480c98..150d97aefc9 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/Frame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/Frame.java @@ -23,7 +23,7 @@ public abstract class Frame public static final int HEADER_LENGTH = 8; public static final int MAX_LENGTH = 0x3F_FF; - private FrameType type; + private final FrameType type; protected Frame(FrameType type) { @@ -35,6 +35,11 @@ public abstract class Frame return type; } + public int getFlowControlledLength() + { + return 0; + } + @Override public String toString() { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java index 3ef4497b3ed..d7f74ff2922 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class DataGenerator extends FrameGenerator { @@ -36,7 +35,7 @@ public class DataGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { DataFrame dataFrame = (DataFrame)frame; generateData(lease, dataFrame.getStreamId(), dataFrame.getData(), dataFrame.isEndStream(), false, null); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/FrameGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/FrameGenerator.java index e8039de130c..ae345710c23 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/FrameGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/FrameGenerator.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.Callback; public abstract class FrameGenerator { @@ -34,7 +33,7 @@ public abstract class FrameGenerator this.headerGenerator = headerGenerator; } - public abstract void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback); + public abstract void generate(ByteBufferPool.Lease lease, Frame frame); protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java index ab5c6396bfe..61c85d4fe96 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java @@ -22,7 +22,6 @@ import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.hpack.HpackEncoder; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.Callback; public class Generator { @@ -59,40 +58,18 @@ public class Generator } + public ByteBufferPool getByteBufferPool() + { + return byteBufferPool; + } + public int getHeaderTableSize() { return headerTableSize; } - public LeaseCallback generate(Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { - LeaseCallback lease = new LeaseCallback(byteBufferPool, callback); - generators[frame.getType().getType()].generate(lease, frame, callback); - return lease; - } - - public static class LeaseCallback extends ByteBufferPool.Lease implements Callback - { - private final Callback callback; - - public LeaseCallback(ByteBufferPool byteBufferPool, Callback callback) - { - super(byteBufferPool); - this.callback = callback; - } - - @Override - public void succeeded() - { - recycle(); - callback.succeeded(); - } - - @Override - public void failed(Throwable x) - { - recycle(); - callback.failed(x); - } + generators[frame.getType().getType()].generate(lease, frame); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/GoAwayGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/GoAwayGenerator.java index 0339c4904d2..121059875f0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/GoAwayGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/GoAwayGenerator.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class GoAwayGenerator extends FrameGenerator { @@ -36,7 +35,7 @@ public class GoAwayGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { GoAwayFrame goAwayFrame = (GoAwayFrame)frame; generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload()); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/HeadersGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/HeadersGenerator.java index 7c1accb284d..e8e19a6be0c 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/HeadersGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/HeadersGenerator.java @@ -28,7 +28,6 @@ import org.eclipse.jetty.http2.hpack.HpackEncoder; import org.eclipse.jetty.http2.hpack.MetaData; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class HeadersGenerator extends FrameGenerator { @@ -41,7 +40,7 @@ public class HeadersGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { HeadersFrame headersFrame = (HeadersFrame)frame; generate(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), !headersFrame.isEndStream(), null); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PingGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PingGenerator.java index 940bb896953..ec5db1a151a 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PingGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PingGenerator.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.PingFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class PingGenerator extends FrameGenerator { @@ -36,7 +35,7 @@ public class PingGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { PingFrame pingFrame = (PingFrame)frame; generatePing(lease, pingFrame.getPayload(), pingFrame.isReply()); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java index 8364a55ddaf..4ebb52d715f 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.PriorityFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class PriorityGenerator extends FrameGenerator { @@ -36,7 +35,7 @@ public class PriorityGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { PriorityFrame priorityFrame = (PriorityFrame)frame; generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive()); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/ResetGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/ResetGenerator.java index 48b792e0074..781f29346d0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/ResetGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/ResetGenerator.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class ResetGenerator extends FrameGenerator { @@ -36,7 +35,7 @@ public class ResetGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { ResetFrame resetFrame = (ResetFrame)frame; generateReset(lease, resetFrame.getStreamId(), resetFrame.getError()); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/SettingsGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/SettingsGenerator.java index 77f483b3e4e..43971341750 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/SettingsGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/SettingsGenerator.java @@ -27,7 +27,6 @@ import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class SettingsGenerator extends FrameGenerator { @@ -37,7 +36,7 @@ public class SettingsGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { SettingsFrame settingsFrame = (SettingsFrame)frame; generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply()); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/WindowUpdateGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/WindowUpdateGenerator.java index 099930371ee..e9be713989e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/WindowUpdateGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/WindowUpdateGenerator.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; public class WindowUpdateGenerator extends FrameGenerator { @@ -36,7 +35,7 @@ public class WindowUpdateGenerator extends FrameGenerator } @Override - public void generate(ByteBufferPool.Lease lease, Frame frame, Callback callback) + public void generate(ByteBufferPool.Lease lease, Frame frame) { WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame; generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta()); @@ -44,15 +43,28 @@ public class WindowUpdateGenerator extends FrameGenerator public void generateWindowUpdate(ByteBufferPool.Lease lease, int streamId, int windowUpdate) { - if (streamId < 0) - throw new IllegalArgumentException("Invalid stream id: " + streamId); if (windowUpdate < 0) throw new IllegalArgumentException("Invalid window update: " + windowUpdate); + // A negative streamId means that we have to generate + // bytes for both the stream and session frames. + boolean both = false; + if (streamId < 0) + { + both = true; + streamId = -streamId; + } + + if (both) + { + ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flag.NONE, 0); + header.putInt(windowUpdate); + BufferUtil.flipToFlush(header, 0); + lease.append(header, true); + } + ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flag.NONE, streamId); - header.putInt(windowUpdate); - BufferUtil.flipToFlush(header, 0); lease.append(header, true); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/DataBodyParser.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/DataBodyParser.java index 84cff736310..4992f416103 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/DataBodyParser.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/DataBodyParser.java @@ -49,7 +49,7 @@ public class DataBodyParser extends BodyParser notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR, "invalid_data_frame"); return false; } - return onData(BufferUtil.EMPTY_BUFFER, false); + return onData(BufferUtil.EMPTY_BUFFER, false, 0); } @Override @@ -116,7 +116,7 @@ public class DataBodyParser extends BodyParser if (length == 0) { state = State.PADDING; - if (onData(slice, false)) + if (onData(slice, false, paddingLength)) { return Result.ASYNC; } @@ -125,7 +125,7 @@ public class DataBodyParser extends BodyParser { // TODO: check the semantic of Flag.END_SEGMENT. // We got partial data, simulate a smaller frame, and stay in DATA state. - if (onData(slice, true)) + if (onData(slice, true, 0)) { return Result.ASYNC; } @@ -153,9 +153,9 @@ public class DataBodyParser extends BodyParser return Result.PENDING; } - private boolean onData(ByteBuffer buffer, boolean fragment) + private boolean onData(ByteBuffer buffer, boolean fragment, int padding) { - DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream()); + DataFrame frame = new DataFrame(getStreamId(), buffer, !fragment && isEndStream(), padding); return notifyData(frame); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java new file mode 100644 index 00000000000..4999a81be8c --- /dev/null +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -0,0 +1,79 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.server; + +import org.eclipse.jetty.http2.HTTP2Connection; +import org.eclipse.jetty.http2.HTTP2FlowControl; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.generator.Generator; +import org.eclipse.jetty.http2.parser.Parser; +import org.eclipse.jetty.http2.parser.ServerParser; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.AbstractConnectionFactory; +import org.eclipse.jetty.server.Connector; + +public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory +{ + private int headerTableSize = 4096; + private int initialWindowSize = 65535; + + public AbstractHTTP2ServerConnectionFactory() + { + super("h2-12"); + } + + public int getHeaderTableSize() + { + return headerTableSize; + } + + public void setHeaderTableSize(int headerTableSize) + { + this.headerTableSize = headerTableSize; + } + + public int getInitialWindowSize() + { + return initialWindowSize; + } + + public void setInitialWindowSize(int initialWindowSize) + { + this.initialWindowSize = initialWindowSize; + } + + @Override + public Connection newConnection(Connector connector, EndPoint endPoint) + { + Session.Listener listener = newSessionListener(connector, endPoint); + + Generator generator = new Generator(connector.getByteBufferPool(), getHeaderTableSize()); + HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener, + new HTTP2FlowControl(getInitialWindowSize())); + + Parser parser = new ServerParser(connector.getByteBufferPool(), session); + HTTP2Connection connection = new HTTP2Connection(connector.getByteBufferPool(), connector.getExecutor(), + endPoint, parser, getInputBufferSize()); + + return configure(connection, connector, endPoint); + } + + protected abstract Session.Listener newSessionListener(Connector connector, EndPoint endPoint); +} diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index da0c5a081e6..57e80e86318 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -18,75 +18,35 @@ package org.eclipse.jetty.http2.server; -import org.eclipse.jetty.http2.HTTP2Connection; -import org.eclipse.jetty.http2.HTTP2FlowControl; import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; -import org.eclipse.jetty.http2.generator.Generator; -import org.eclipse.jetty.http2.parser.Parser; -import org.eclipse.jetty.http2.parser.ServerParser; -import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.server.AbstractConnectionFactory; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class HTTP2ServerConnectionFactory extends AbstractConnectionFactory +public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory { private static final Logger LOG = Log.getLogger(HTTP2ServerConnectionFactory.class); private static final String CHANNEL_ATTRIBUTE = HttpChannelOverHTTP2.class.getName(); private final HttpConfiguration httpConfiguration; - private int headerTableSize = 4096; - private int initialWindowSize = 65535; public HTTP2ServerConnectionFactory(HttpConfiguration httpConfiguration) { - super("h2-12"); this.httpConfiguration = httpConfiguration; } - public int getHeaderTableSize() - { - return headerTableSize; - } - - public void setHeaderTableSize(int headerTableSize) - { - this.headerTableSize = headerTableSize; - } - - public int getInitialWindowSize() - { - return initialWindowSize; - } - - public void setInitialWindowSize(int initialWindowSize) - { - this.initialWindowSize = initialWindowSize; - } - @Override - public Connection newConnection(Connector connector, EndPoint endPoint) + protected Session.Listener newSessionListener(Connector connector, EndPoint endPoint) { - Session.Listener listener = new HTTPServerSessionListener(connector, httpConfiguration, endPoint); - - Generator generator = new Generator(connector.getByteBufferPool(), getHeaderTableSize()); - HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener, new HTTP2FlowControl(), - getInitialWindowSize()); - - Parser parser = new ServerParser(connector.getByteBufferPool(), session); - HTTP2Connection connection = new HTTP2Connection(connector.getByteBufferPool(), connector.getExecutor(), - endPoint, parser, getInputBufferSize()); - - return configure(connection, connector, endPoint); + return new HTTPServerSessionListener(connector, httpConfiguration, endPoint); } private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 60fd4066a0e..9c34eced0c5 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -29,12 +29,13 @@ import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.parser.ServerParser; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.Callback; public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Listener { - public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize) + public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl) { - super(endPoint, generator, listener, flowControl, initialWindowSize, 2); + super(endPoint, generator, listener, flowControl, 2); } @Override @@ -43,7 +44,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis // SPEC: send a SETTINGS frame upon receiving the preface. HashMap settings = new HashMap<>(); settings.put(SettingsFrame.HEADER_TABLE_SIZE, getGenerator().getHeaderTableSize()); - settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialWindowSize()); + settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getFlowControl().getInitialWindowSize()); int maxConcurrentStreams = getMaxStreamCount(); if (maxConcurrentStreams >= 0) settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams); @@ -59,7 +60,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis if (stream != null) { stream.updateClose(frame.isEndStream(), false); - stream.process(frame); + stream.process(frame, Callback.Adapter.INSTANCE); Stream.Listener listener = notifyNewStream(stream, frame); stream.setListener(listener); // The listener may have sent a frame that closed the stream. diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java new file mode 100644 index 00000000000..3dbfd05e30c --- /dev/null +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java @@ -0,0 +1,39 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.server; + +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.Connector; + +public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory +{ + private final Session.Listener listener; + + public RawHTTP2ServerConnectionFactory(Session.Listener listener) + { + this.listener = listener; + } + + @Override + protected Session.Listener newSessionListener(Connector connector, EndPoint endPoint) + { + return listener; + } +} diff --git a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java index 2a6940edb01..d172b194310 100644 --- a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java +++ b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java @@ -56,7 +56,6 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -105,7 +104,8 @@ public class HTTP2ServerTest MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, HttpMethod.GET.asString(), host + ":" + port, host, port, path, fields); HeadersFrame request = new HeadersFrame(1, metaData, null, true); - Generator.LeaseCallback lease = generator.generate(request, Callback.Adapter.INSTANCE); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.generate(lease, request); // No preface bytes @@ -153,7 +153,8 @@ public class HTTP2ServerTest MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, HttpMethod.GET.asString(), host + ":" + port, host, port, path, fields); HeadersFrame request = new HeadersFrame(1, metaData, null, true); - Generator.LeaseCallback lease = generator.generate(request, Callback.Adapter.INSTANCE); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.generate(lease, request); lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false); try (Socket client = new Socket(host, port)) @@ -215,7 +216,8 @@ public class HTTP2ServerTest MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, HttpMethod.GET.asString(), host + ":" + port, host, port, path, fields); HeadersFrame request = new HeadersFrame(1, metaData, null, true); - Generator.LeaseCallback lease = generator.generate(request, Callback.Adapter.INSTANCE); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.generate(lease, request); lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false); try (Socket client = new Socket(host, port))