Issue #2081 No idle timeout exception when dispatch is delayed * Delegate the readtimeout handling to HttpChannel so that a delayed dispatch can be ended. * Added unit test for delayed dispatch idle * Now using HttpInput.onIdleTimeout() to fail the HttpInput, and then dispatching the request in case it has not been dispatched yet. This ensure consistent behavior independently of the value of HttpConfiguration.delayDispatchUntilContent. * Fixed for both HTTP/1.1 and HTTP/2. * Added tests for non-blocking reads. Signed-off-by: Greg Wilkins <gregw@webtide.com> Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
595c6ac1ee
commit
41050cd8a4
|
@ -263,10 +263,10 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
|
|||
return new SslConnection(byteBufferPool, executor, endPoint, engine)
|
||||
{
|
||||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
{
|
||||
sslIdle.set(true);
|
||||
return super.onReadTimeout();
|
||||
return super.onReadTimeout(timeout);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -126,12 +126,12 @@ public class SslBytesServerTest extends SslBytesTest
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
{
|
||||
final Runnable idleHook = SslBytesServerTest.this.idleHook;
|
||||
if (idleHook != null)
|
||||
idleHook.run();
|
||||
return super.onReadTimeout();
|
||||
return super.onReadTimeout(timeout);
|
||||
}
|
||||
}, connector, endPoint);
|
||||
}
|
||||
|
|
|
@ -315,16 +315,19 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
|
|||
|
||||
public boolean onStreamTimeout(Throwable failure, Consumer<Runnable> consumer)
|
||||
{
|
||||
boolean result = false;
|
||||
if (isRequestIdle())
|
||||
{
|
||||
boolean delayed = _delayedUntilContent;
|
||||
_delayedUntilContent = false;
|
||||
|
||||
boolean result = isRequestIdle();
|
||||
if (result)
|
||||
consumeInput();
|
||||
result = true;
|
||||
}
|
||||
|
||||
getHttpTransport().onStreamTimeout(failure);
|
||||
if (getRequest().getHttpInput().onIdleTimeout(failure))
|
||||
if (getRequest().getHttpInput().onIdleTimeout(failure) || delayed)
|
||||
{
|
||||
consumer.accept(this::handleWithContext);
|
||||
result = false;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ public abstract class AbstractConnection implements Connection
|
|||
{
|
||||
boolean close = true;
|
||||
if (cause instanceof TimeoutException)
|
||||
close = onReadTimeout();
|
||||
close = onReadTimeout(cause);
|
||||
if (close)
|
||||
{
|
||||
if (_endPoint.isOutputShutdown())
|
||||
|
@ -183,9 +183,11 @@ public abstract class AbstractConnection implements Connection
|
|||
|
||||
/**
|
||||
* <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
|
||||
*
|
||||
* @param timeout the cause of the read timeout
|
||||
* @return true to signal that the endpoint must be closed, false to keep the endpoint open
|
||||
*/
|
||||
protected boolean onReadTimeout()
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -399,6 +399,17 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
|
|||
return !_delayedForContent;
|
||||
}
|
||||
|
||||
boolean onIdleTimeout(Throwable timeout)
|
||||
{
|
||||
if (_delayedForContent)
|
||||
{
|
||||
_delayedForContent = false;
|
||||
getRequest().getHttpInput().onIdleTimeout(timeout);
|
||||
execute(this);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Attempts to perform a HTTP/1.1 upgrade.</p>
|
||||
|
|
|
@ -286,7 +286,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
LOG.debug("{} onFillable exit {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Fill and parse data looking for content
|
||||
* @return true if an {@link RequestHandler} method was called and it returned true;
|
||||
|
@ -486,6 +486,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
{
|
||||
return _channel.onIdleTimeout(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFillInterestedFailed(Throwable cause)
|
||||
{
|
||||
|
|
|
@ -782,7 +782,8 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
if (_waitingForContent && !isError())
|
||||
boolean neverDispatched = getHttpChannelState().isIdle();
|
||||
if ((_waitingForContent || neverDispatched) && !isError())
|
||||
{
|
||||
x.addSuppressed(new Throwable("HttpInput idle timeout"));
|
||||
_state = new ErrorState(x);
|
||||
|
|
|
@ -601,7 +601,8 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
|
|||
long start = System.currentTimeMillis();
|
||||
try
|
||||
{
|
||||
IO.toString(is);
|
||||
String response = IO.toString(is);
|
||||
Assert.assertThat(response,Matchers.is(""));
|
||||
Assert.assertEquals(-1, is.read());
|
||||
}
|
||||
catch(SSLException e)
|
||||
|
@ -629,12 +630,13 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
|
|||
long start = System.currentTimeMillis();
|
||||
try
|
||||
{
|
||||
IO.toString(is);
|
||||
String response = IO.toString(is);
|
||||
Assert.assertThat(response,Matchers.is(""));
|
||||
Assert.assertEquals(-1, is.read());
|
||||
}
|
||||
catch(SSLException e)
|
||||
{
|
||||
// e.printStackTrace();
|
||||
e.printStackTrace();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
|
@ -644,6 +646,88 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
|
|||
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testMaxIdleDelayedDispatch() throws Exception
|
||||
{
|
||||
configureServer(new EchoHandler());
|
||||
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
|
||||
client.setSoTimeout(10000);
|
||||
InputStream is=client.getInputStream();
|
||||
Assert.assertFalse(client.isClosed());
|
||||
|
||||
OutputStream os=client.getOutputStream();
|
||||
os.write((
|
||||
"GET / HTTP/1.1\r\n"+
|
||||
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
|
||||
"connection: keep-alive\r\n"+
|
||||
"Content-Length: 20\r\n"+
|
||||
"Content-Type: text/plain\r\n"+
|
||||
"Connection: close\r\n"+
|
||||
"\r\n").getBytes("utf-8"));
|
||||
os.flush();
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
try
|
||||
{
|
||||
String response = IO.toString(is);
|
||||
Assert.assertThat(response,Matchers.containsString("500"));
|
||||
Assert.assertEquals(-1, is.read());
|
||||
}
|
||||
catch(SSLException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
int duration = (int)(System.currentTimeMillis() - start);
|
||||
Assert.assertThat(duration,Matchers.greaterThanOrEqualTo(MAX_IDLE_TIME));
|
||||
Assert.assertThat(duration,Matchers.lessThan(maximumTestRuntime));
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testMaxIdleDispatch() throws Exception
|
||||
{
|
||||
configureServer(new EchoHandler());
|
||||
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
|
||||
client.setSoTimeout(10000);
|
||||
InputStream is=client.getInputStream();
|
||||
Assert.assertFalse(client.isClosed());
|
||||
|
||||
OutputStream os=client.getOutputStream();
|
||||
os.write((
|
||||
"GET / HTTP/1.1\r\n"+
|
||||
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
|
||||
"connection: keep-alive\r\n"+
|
||||
"Content-Length: 20\r\n"+
|
||||
"Content-Type: text/plain\r\n"+
|
||||
"Connection: close\r\n"+
|
||||
"\r\n"+
|
||||
"1234567890").getBytes("utf-8"));
|
||||
os.flush();
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
try
|
||||
{
|
||||
String response = IO.toString(is);
|
||||
Assert.assertThat(response,Matchers.containsString("500"));
|
||||
Assert.assertEquals(-1, is.read());
|
||||
}
|
||||
catch(SSLException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
int duration = (int)(System.currentTimeMillis() - start);
|
||||
Assert.assertThat(duration,Matchers.greaterThanOrEqualTo(MAX_IDLE_TIME));
|
||||
Assert.assertThat(duration,Matchers.lessThan(maximumTestRuntime));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testMaxIdleWithSlowRequest() throws Exception
|
||||
{
|
||||
|
|
|
@ -18,14 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.SocketTimeoutException;
|
||||
|
@ -78,6 +70,14 @@ import org.junit.Ignore;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class ClientCloseTest
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
|
||||
|
@ -147,7 +147,7 @@ public class ClientCloseTest
|
|||
@Override
|
||||
public void onWebSocketError(Throwable cause)
|
||||
{
|
||||
LOG.warn("onWebSocketError",cause);
|
||||
LOG.debug("onWebSocketError",cause);
|
||||
assertThat("Unique Error Event", error.compareAndSet(null, cause), is(true));
|
||||
errorLatch.countDown();
|
||||
}
|
||||
|
@ -526,8 +526,7 @@ public class ClientCloseTest
|
|||
|
||||
// client idle timeout triggers close event on client ws-endpoint
|
||||
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
|
||||
assertThat("OnError", clientSocket.error.get(), instanceOf(SocketTimeoutException.class));
|
||||
assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Timeout on Read"));
|
||||
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.common.io;
|
|||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -448,11 +447,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
this.ioState.onOpened();
|
||||
}
|
||||
|
||||
/**
|
||||
* Event for no activity on connection (read or write)
|
||||
*/
|
||||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
{
|
||||
IOState state = getIOState();
|
||||
ConnectionState cstate = state.getConnectionState();
|
||||
|
@ -470,7 +466,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
|
||||
try
|
||||
{
|
||||
notifyError(new SocketTimeoutException("Timeout on Read"));
|
||||
notifyError(timeout);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.eclipse.jetty.http.HttpStatus;
|
|||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
|
||||
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
|
@ -75,34 +76,114 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDelayedDispatchRequestWithDelayedFirstContentIdleTimeoutFires() throws Exception
|
||||
public void testBlockingReadWithDelayedFirstContentWithUndelayedDispatchIdleTimeoutFires() throws Exception
|
||||
{
|
||||
httpConfig.setDelayDispatchUntilContent(true);
|
||||
CountDownLatch handlerLatch = new CountDownLatch(1);
|
||||
start(new AbstractHandler.ErrorDispatchHandler()
|
||||
testBlockingReadWithDelayedFirstContentIdleTimeoutFires(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockingReadWithDelayedFirstContentWithDelayedDispatchIdleTimeoutFires() throws Exception
|
||||
{
|
||||
testBlockingReadWithDelayedFirstContentIdleTimeoutFires(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncReadWithDelayedFirstContentWithUndelayedDispatchIdleTimeoutFires() throws Exception
|
||||
{
|
||||
testAsyncReadWithDelayedFirstContentIdleTimeoutFires(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncReadWithDelayedFirstContentWithDelayedDispatchIdleTimeoutFires() throws Exception
|
||||
{
|
||||
testAsyncReadWithDelayedFirstContentIdleTimeoutFires(true);
|
||||
}
|
||||
|
||||
private void testBlockingReadWithDelayedFirstContentIdleTimeoutFires(boolean delayDispatch) throws Exception
|
||||
{
|
||||
testReadWithDelayedFirstContentIdleTimeoutFires(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
handlerLatch.countDown();
|
||||
// The client did not send the content,
|
||||
// idle timeout should result in IOException.
|
||||
request.getInputStream().read();
|
||||
}
|
||||
});
|
||||
long idleTimeout = 2500;
|
||||
setServerIdleTimeout(idleTimeout);
|
||||
}, delayDispatch);
|
||||
}
|
||||
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
client.POST(newURI())
|
||||
.content(new DeferredContentProvider())
|
||||
.send(result ->
|
||||
private void testAsyncReadWithDelayedFirstContentIdleTimeoutFires(boolean delayDispatch) throws Exception
|
||||
{
|
||||
testReadWithDelayedFirstContentIdleTimeoutFires(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
request.getInputStream().setReadListener(new ReadListener()
|
||||
{
|
||||
if (result.isFailed())
|
||||
resultLatch.countDown();
|
||||
@Override
|
||||
public void onDataAvailable()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllDataRead()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
if (t instanceof TimeoutException)
|
||||
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
|
||||
asyncContext.complete();
|
||||
}
|
||||
});
|
||||
|
||||
// We did not send the content, the request was not
|
||||
// dispatched, the server should have idle timed out.
|
||||
Assert.assertFalse(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
}, delayDispatch);
|
||||
}
|
||||
|
||||
private void testReadWithDelayedFirstContentIdleTimeoutFires(Handler handler, boolean delayDispatch) throws Exception
|
||||
{
|
||||
httpConfig.setDelayDispatchUntilContent(delayDispatch);
|
||||
CountDownLatch handlerLatch = new CountDownLatch(1);
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
try
|
||||
{
|
||||
handler.handle(target, jettyRequest, request, response);
|
||||
}
|
||||
finally
|
||||
{
|
||||
handlerLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
long idleTimeout = 1000;
|
||||
setServerIdleTimeout(idleTimeout);
|
||||
|
||||
CountDownLatch resultLatch = new CountDownLatch(2);
|
||||
DeferredContentProvider content = new DeferredContentProvider();
|
||||
client.POST(newURI())
|
||||
.content(content)
|
||||
.onResponseSuccess(response ->
|
||||
{
|
||||
if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
|
||||
resultLatch.countDown();
|
||||
content.close();
|
||||
})
|
||||
.send(result -> resultLatch.countDown());
|
||||
|
||||
// The client did not send the content, the request was
|
||||
// dispatched, the server should have idle timed it out.
|
||||
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue