Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-05-07 14:50:56 +02:00
commit 19c63703ea
4 changed files with 97 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -33,6 +34,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -44,12 +46,11 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat;
public class IdleTimeoutTest extends AbstractTest
{
private final int idleTimeout = 1000;
@ -364,7 +365,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onTimeout(Stream stream, Throwable x)
{
assertThat(x, instanceOf(TimeoutException.class));
Assert.assertThat(x, Matchers.instanceOf(TimeoutException.class));
timeoutLatch.countDown();
}
});
@ -531,6 +532,55 @@ public class IdleTimeoutTest extends AbstractTest
Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS));
}
@Test
public void testBufferedReadsResetStreamIdleTimeout() throws Exception
{
long delay = 1000;
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[8192];
while (true)
{
int read = input.read(buffer);
Log.getLogger(IdleTimeoutTest.class).info("Read {} bytes", read);
if (read < 0)
break;
sleep(delay);
}
}
});
connector.setIdleTimeout(2 * delay);
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Send data larger than the flow control window.
// The client will send bytes up to the flow control window immediately
// and they will be buffered by the server, which will read them slowly.
// Server reads should reset the idle timeout.
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE + 1);
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
Assert.assertTrue(latch.await(555, TimeUnit.SECONDS));
}
private void sleep(long value)
{
try

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -84,6 +85,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private int maxRemoteStreams;
private long streamIdleTimeout;
private boolean pushEnabled;
private long idleTime;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{
@ -100,6 +102,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.pushEnabled = true; // SPEC: by default, push is enabled.
this.idleTime = System.nanoTime();
}
@Override
@ -183,7 +186,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void succeeded()
{
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
complete();
}
@Override
@ -191,6 +194,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// Consume also in case of failures, to free the
// session flow control window for other streams.
complete();
}
private void complete()
{
notIdle();
stream.notIdle();
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
});
@ -408,8 +418,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
IStream stream = getStream(streamId);
if (stream != null)
{
stream.process(frame, Callback.NOOP);
onWindowUpdate(stream, frame);
}
}
else
{
onWindowUpdate(null, frame);
@ -619,8 +632,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush)
{
if (entry.stream != null)
entry.stream.notIdle();
flusher.iterate();
}
}
protected IStream createLocalStream(int streamId, Promise<Stream> promise)
{
@ -866,6 +883,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
case NOT_CLOSED:
{
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime);
if (elapsed < endPoint.getIdleTimeout())
return false;
return notifyIdleTimeout(this);
}
case LOCALLY_CLOSED:
@ -881,6 +901,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
private void notIdle()
{
idleTime = System.nanoTime();
}
@Override
public void onFrame(Frame frame)
{

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -87,14 +88,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{
if (!checkWrite(callback))
return;
notIdle();
session.frames(this, this, frame, Frame.EMPTY_ARRAY);
}
@Override
public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
{
notIdle();
session.push(this, promise, frame, listener);
}
@ -103,7 +102,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{
if (!checkWrite(callback))
return;
notIdle();
session.data(this, this, frame);
}
@ -112,7 +110,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{
if (isReset())
return;
notIdle();
localReset = true;
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
}
@ -240,6 +237,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
onPush((PushPromiseFrame)frame, callback);
break;
}
case WINDOW_UPDATE:
{
onWindowUpdate((WindowUpdateFrame)frame, callback);
break;
}
default:
{
throw new UnsupportedOperationException();
@ -302,6 +304,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
callback.succeeded();
}
private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)
{
callback.succeeded();
}
@Override
public boolean updateClose(boolean update, boolean local)
{

View File

@ -99,4 +99,10 @@ public interface IStream extends Stream, Closeable
* @return the previous value of the stream receive window
*/
public int updateRecvWindow(int delta);
/**
* <p>Marks this stream as not idle so that the
* {@link #getIdleTimeout() idle timeout} is postponed.</p>
*/
public void notIdle();
}