diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java new file mode 100644 index 00000000000..4c495c9f75f --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java @@ -0,0 +1,247 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.AsyncContext; +import javax.servlet.ReadListener; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.junit.Assert; +import org.junit.Test; + +public class AsyncIOTest extends AbstractTest +{ + @Test + public void testLastContentAvailableBeforeService() throws Exception + { + start(new HttpServlet() + { + @Override + protected void service(final HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + // Wait for the data to fully arrive. + sleep(1000); + + final AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + request.getInputStream().setReadListener(new EmptyReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + ServletInputStream input = request.getInputStream(); + while (input.isReady()) + { + int read = input.read(); + if (read < 0) + break; + } + if (input.isFinished()) + asyncContext.complete(); + } + }); + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + + HttpFields fields = new HttpFields(); + MetaData.Request metaData = newRequest("GET", fields); + HeadersFrame frame = new HeadersFrame(1, metaData, null, false); + final CountDownLatch latch = new CountDownLatch(1); + FuturePromise promise = new FuturePromise<>(); + session.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + latch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testLastContentAvailableAfterServiceReturns() throws Exception + { + start(new HttpServlet() + { + @Override + protected void service(final HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + final AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + request.getInputStream().setReadListener(new EmptyReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + ServletInputStream input = request.getInputStream(); + while (input.isReady()) + { + int read = input.read(); + if (read < 0) + break; + } + if (input.isFinished()) + asyncContext.complete(); + } + }); + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + + HttpFields fields = new HttpFields(); + MetaData.Request metaData = newRequest("GET", fields); + HeadersFrame frame = new HeadersFrame(1, metaData, null, false); + final CountDownLatch latch = new CountDownLatch(1); + FuturePromise promise = new FuturePromise<>(); + session.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + latch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + + // Wait until service() returns. + Thread.sleep(1000); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testSomeContentAvailableAfterServiceReturns() throws Exception + { + final AtomicInteger count = new AtomicInteger(); + start(new HttpServlet() + { + @Override + protected void service(final HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + final AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + request.getInputStream().setReadListener(new EmptyReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + count.incrementAndGet(); + ServletInputStream input = request.getInputStream(); + while (input.isReady()) + { + int read = input.read(); + if (read < 0) + break; + } + if (input.isFinished()) + asyncContext.complete(); + } + }); + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + + HttpFields fields = new HttpFields(); + MetaData.Request metaData = newRequest("GET", fields); + HeadersFrame frame = new HeadersFrame(1, metaData, null, false); + final CountDownLatch latch = new CountDownLatch(1); + FuturePromise promise = new FuturePromise<>(); + session.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + latch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + + // Wait until service() returns. + Thread.sleep(1000); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE); + + // Wait until onDataAvailable() returns. + Thread.sleep(1000); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + // Make sure onDataAvailable() has been called twice + Assert.assertEquals(2, count.get()); + } + + private static void sleep(long ms) throws InterruptedIOException + { + try + { + Thread.sleep(ms); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + + private static class EmptyReadListener implements ReadListener + { + @Override + public void onDataAvailable() throws IOException + { + } + + @Override + public void onAllDataRead() throws IOException + { + } + + @Override + public void onError(Throwable t) + { + } + } +} diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index b258b142893..795b038b288 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.parser.Parser; @@ -44,6 +45,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; @@ -96,7 +98,18 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection LOG.debug("Processing {} on {}", frame, stream); HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream); Runnable task = channel.onRequest(frame); - offerTask(task, false); + if (task != null) + offerTask(task, false); + } + + public void onData(IStream stream, DataFrame frame, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("Processing {} on {}", frame, stream); + HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); + Runnable task = channel.requestContent(frame, callback); + if (task != null) + offerTask(task, false); } public void push(Connector connector, IStream stream, MetaData.Request request) @@ -105,7 +118,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection LOG.debug("Processing push {} on {}", request, stream); HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream); Runnable task = channel.onPushRequest(request); - offerTask(task, true); + if (task != null) + offerTask(task, true); } private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index 147a7066253..3f9366f53b3 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -72,7 +72,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF return !(HTTP2Cipher.isBlackListProtocol(tlsProtocol) && HTTP2Cipher.isBlackListCipher(tlsCipher)); } - public class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener + private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener { private final Connector connector; private final EndPoint endPoint; @@ -83,11 +83,11 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF this.endPoint = endPoint; } - public Connector getConnector() + private HTTP2ServerConnection getConnection() { - return connector; + return (HTTP2ServerConnection)endPoint.getConnection(); } - + @Override public Map onPreface(Session session) { @@ -103,7 +103,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF @Override public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - ((HTTP2ServerConnection)endPoint.getConnection()).onNewStream(connector, (IStream)stream, frame); + getConnection().onNewStream(connector, (IStream)stream, frame); return frame.isEndStream() ? null : this; } @@ -125,11 +125,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF @Override public void onData(Stream stream, DataFrame frame, Callback callback) { - if (LOG.isDebugEnabled()) - LOG.debug("Processing {} on {}", frame, stream); - - HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); - channel.requestContent(frame, callback); + getConnection().onData((IStream)stream, frame, callback); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 2f1cf86af74..883f990bde4 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -93,6 +93,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel System.lineSeparator(), fields); } + // TODO: support HttpConfiguration.delayDispatchUntilContent return this; } @@ -140,7 +141,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel } } - public void requestContent(DataFrame frame, final Callback callback) + public Runnable requestContent(DataFrame frame, final Callback callback) { // We must copy the data since we do not know when the // application will consume its bytes (we queue them by @@ -148,17 +149,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel // since there may be frames for other streams. final ByteBufferPool byteBufferPool = getByteBufferPool(); ByteBuffer original = frame.getData(); - final ByteBuffer copy = byteBufferPool.acquire(original.remaining(), original.isDirect()); + int length = original.remaining(); + final ByteBuffer copy = byteBufferPool.acquire(length, original.isDirect()); BufferUtil.clearToFill(copy); copy.put(original).flip(); - if (LOG.isDebugEnabled()) - { - Stream stream = getStream(); - LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), copy.remaining()); - } - - onContent(new HttpInput.Content(copy) + boolean handle = onContent(new HttpInput.Content(copy) { @Override public void succeeded() @@ -175,10 +171,22 @@ public class HttpChannelOverHTTP2 extends HttpChannel } }); - if (frame.isEndStream()) + boolean endStream = frame.isEndStream(); + if (endStream) + handle |= onRequestComplete(); + + if (LOG.isDebugEnabled()) { - onRequestComplete(); + Stream stream = getStream(); + LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, handle: {}", + stream.getId(), + Integer.toHexString(stream.getSession().hashCode()), + length, + endStream ? "last" : "some", + handle); } + + return handle ? this : null; } /**