Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-05-07 14:51:29 +02:00
commit f4353695e5
4 changed files with 97 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -33,6 +34,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
@ -44,12 +46,11 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat;
public class IdleTimeoutTest extends AbstractTest public class IdleTimeoutTest extends AbstractTest
{ {
private final int idleTimeout = 1000; private final int idleTimeout = 1000;
@ -364,7 +365,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override @Override
public void onTimeout(Stream stream, Throwable x) public void onTimeout(Stream stream, Throwable x)
{ {
assertThat(x, instanceOf(TimeoutException.class)); Assert.assertThat(x, Matchers.instanceOf(TimeoutException.class));
timeoutLatch.countDown(); timeoutLatch.countDown();
} }
}); });
@ -531,6 +532,55 @@ public class IdleTimeoutTest extends AbstractTest
Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS)); Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS));
} }
@Test
public void testBufferedReadsResetStreamIdleTimeout() throws Exception
{
long delay = 1000;
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[8192];
while (true)
{
int read = input.read(buffer);
Log.getLogger(IdleTimeoutTest.class).info("Read {} bytes", read);
if (read < 0)
break;
sleep(delay);
}
}
});
connector.setIdleTimeout(2 * delay);
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Send data larger than the flow control window.
// The client will send bytes up to the flow control window immediately
// and they will be buffered by the server, which will read them slowly.
// Server reads should reset the idle timeout.
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE + 1);
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
Assert.assertTrue(latch.await(555, TimeUnit.SECONDS));
}
private void sleep(long value) private void sleep(long value)
{ {
try try

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -84,6 +85,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private int maxRemoteStreams; private int maxRemoteStreams;
private long streamIdleTimeout; private long streamIdleTimeout;
private boolean pushEnabled; private boolean pushEnabled;
private long idleTime;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId) public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{ {
@ -100,6 +102,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.pushEnabled = true; // SPEC: by default, push is enabled. this.pushEnabled = true; // SPEC: by default, push is enabled.
this.idleTime = System.nanoTime();
} }
@Override @Override
@ -183,7 +186,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void succeeded() public void succeeded()
{ {
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); complete();
} }
@Override @Override
@ -191,6 +194,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{ {
// Consume also in case of failures, to free the // Consume also in case of failures, to free the
// session flow control window for other streams. // session flow control window for other streams.
complete();
}
private void complete()
{
notIdle();
stream.notIdle();
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
} }
}); });
@ -408,7 +418,10 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{ {
IStream stream = getStream(streamId); IStream stream = getStream(streamId);
if (stream != null) if (stream != null)
{
stream.process(frame, Callback.NOOP);
onWindowUpdate(stream, frame); onWindowUpdate(stream, frame);
}
} }
else else
{ {
@ -619,7 +632,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// Ping frames are prepended to process them as soon as possible. // Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush) if (queued && flush)
{
if (entry.stream != null)
entry.stream.notIdle();
flusher.iterate(); flusher.iterate();
}
} }
protected IStream createLocalStream(int streamId, Promise<Stream> promise) protected IStream createLocalStream(int streamId, Promise<Stream> promise)
@ -866,6 +883,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{ {
case NOT_CLOSED: case NOT_CLOSED:
{ {
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime);
if (elapsed < endPoint.getIdleTimeout())
return false;
return notifyIdleTimeout(this); return notifyIdleTimeout(this);
} }
case LOCALLY_CLOSED: case LOCALLY_CLOSED:
@ -881,6 +901,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
private void notIdle()
{
idleTime = System.nanoTime();
}
@Override @Override
public void onFrame(Frame frame) public void onFrame(Frame frame)
{ {

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
@ -87,14 +88,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{ {
if (!checkWrite(callback)) if (!checkWrite(callback))
return; return;
notIdle();
session.frames(this, this, frame, Frame.EMPTY_ARRAY); session.frames(this, this, frame, Frame.EMPTY_ARRAY);
} }
@Override @Override
public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener) public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
{ {
notIdle();
session.push(this, promise, frame, listener); session.push(this, promise, frame, listener);
} }
@ -103,7 +102,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{ {
if (!checkWrite(callback)) if (!checkWrite(callback))
return; return;
notIdle();
session.data(this, this, frame); session.data(this, this, frame);
} }
@ -112,7 +110,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{ {
if (isReset()) if (isReset())
return; return;
notIdle();
localReset = true; localReset = true;
session.frames(this, callback, frame, Frame.EMPTY_ARRAY); session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
} }
@ -240,6 +237,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
onPush((PushPromiseFrame)frame, callback); onPush((PushPromiseFrame)frame, callback);
break; break;
} }
case WINDOW_UPDATE:
{
onWindowUpdate((WindowUpdateFrame)frame, callback);
break;
}
default: default:
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -302,6 +304,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
callback.succeeded(); callback.succeeded();
} }
private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)
{
callback.succeeded();
}
@Override @Override
public boolean updateClose(boolean update, boolean local) public boolean updateClose(boolean update, boolean local)
{ {

View File

@ -99,4 +99,10 @@ public interface IStream extends Stream, Closeable
* @return the previous value of the stream receive window * @return the previous value of the stream receive window
*/ */
public int updateRecvWindow(int delta); public int updateRecvWindow(int delta);
/**
* <p>Marks this stream as not idle so that the
* {@link #getIdleTimeout() idle timeout} is postponed.</p>
*/
public void notIdle();
} }