464706 - HTTP/2 and async I/O: onDataAvailable() not called.

Changed HTTPServerConnection to return a Runnable to be run by the
execution strategy also in case of content.
This allows onDataAvailable() to be called at the proper times.
This commit is contained in:
Simone Bordet 2015-04-15 17:21:17 +02:00
parent 0360b1cc5b
commit e868ab0b0d
4 changed files with 288 additions and 23 deletions

View File

@ -0,0 +1,247 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class AsyncIOTest extends AbstractTest
{
@Test
public void testLastContentAvailableBeforeService() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(final HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Wait for the data to fully arrive.
sleep(1000);
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
request.getInputStream().setReadListener(new EmptyReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
ServletInputStream input = request.getInputStream();
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
if (input.isFinished())
asyncContext.complete();
}
});
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, false);
final CountDownLatch latch = new CountDownLatch(1);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testLastContentAvailableAfterServiceReturns() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(final HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
request.getInputStream().setReadListener(new EmptyReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
ServletInputStream input = request.getInputStream();
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
if (input.isFinished())
asyncContext.complete();
}
});
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, false);
final CountDownLatch latch = new CountDownLatch(1);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Wait until service() returns.
Thread.sleep(1000);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testSomeContentAvailableAfterServiceReturns() throws Exception
{
final AtomicInteger count = new AtomicInteger();
start(new HttpServlet()
{
@Override
protected void service(final HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
request.getInputStream().setReadListener(new EmptyReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
count.incrementAndGet();
ServletInputStream input = request.getInputStream();
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
if (input.isFinished())
asyncContext.complete();
}
});
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, false);
final CountDownLatch latch = new CountDownLatch(1);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Wait until service() returns.
Thread.sleep(1000);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE);
// Wait until onDataAvailable() returns.
Thread.sleep(1000);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Make sure onDataAvailable() has been called twice
Assert.assertEquals(2, count.get());
}
private static void sleep(long ms) throws InterruptedIOException
{
try
{
Thread.sleep(ms);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
private static class EmptyReadListener implements ReadListener
{
@Override
public void onDataAvailable() throws IOException
{
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
}
}
}

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.Parser;
@ -44,6 +45,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
@ -96,7 +98,18 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onRequest(frame);
offerTask(task, false);
if (task != null)
offerTask(task, false);
}
public void onData(IStream stream, DataFrame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
Runnable task = channel.requestContent(frame, callback);
if (task != null)
offerTask(task, false);
}
public void push(Connector connector, IStream stream, MetaData.Request request)
@ -105,7 +118,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
LOG.debug("Processing push {} on {}", request, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onPushRequest(request);
offerTask(task, true);
if (task != null)
offerTask(task, true);
}
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)

View File

@ -72,7 +72,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return !(HTTP2Cipher.isBlackListProtocol(tlsProtocol) && HTTP2Cipher.isBlackListCipher(tlsCipher));
}
public class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
{
private final Connector connector;
private final EndPoint endPoint;
@ -83,9 +83,9 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
this.endPoint = endPoint;
}
public Connector getConnector()
private HTTP2ServerConnection getConnection()
{
return connector;
return (HTTP2ServerConnection)endPoint.getConnection();
}
@Override
@ -103,7 +103,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
((HTTP2ServerConnection)endPoint.getConnection()).onNewStream(connector, (IStream)stream, frame);
getConnection().onNewStream(connector, (IStream)stream, frame);
return frame.isEndStream() ? null : this;
}
@ -125,11 +125,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.requestContent(frame, callback);
getConnection().onData((IStream)stream, frame, callback);
}
@Override

View File

@ -93,6 +93,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
System.lineSeparator(), fields);
}
// TODO: support HttpConfiguration.delayDispatchUntilContent
return this;
}
@ -140,7 +141,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
}
public void requestContent(DataFrame frame, final Callback callback)
public Runnable requestContent(DataFrame frame, final Callback callback)
{
// We must copy the data since we do not know when the
// application will consume its bytes (we queue them by
@ -148,17 +149,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
// since there may be frames for other streams.
final ByteBufferPool byteBufferPool = getByteBufferPool();
ByteBuffer original = frame.getData();
final ByteBuffer copy = byteBufferPool.acquire(original.remaining(), original.isDirect());
int length = original.remaining();
final ByteBuffer copy = byteBufferPool.acquire(length, original.isDirect());
BufferUtil.clearToFill(copy);
copy.put(original).flip();
if (LOG.isDebugEnabled())
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), copy.remaining());
}
onContent(new HttpInput.Content(copy)
boolean handle = onContent(new HttpInput.Content(copy)
{
@Override
public void succeeded()
@ -175,10 +171,22 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
});
if (frame.isEndStream())
boolean endStream = frame.isEndStream();
if (endStream)
handle |= onRequestComplete();
if (LOG.isDebugEnabled())
{
onRequestComplete();
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, handle: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
handle);
}
return handle ? this : null;
}
/**