367716: maxIdleTime fixes for async http and new maxIdleTime tests
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
06f0498a48
commit
d6e841beae
|
@ -56,7 +56,8 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
|
|||
setCurrentConnection(this);
|
||||
|
||||
// don't check for idle while dispatched (unless blocking IO is done).
|
||||
_asyncEndp.setCheckForIdle(false);
|
||||
if(!_request.isAsyncCompleted())
|
||||
_asyncEndp.setCheckForIdle(false);
|
||||
|
||||
|
||||
// While progress and the connection has not changed
|
||||
|
@ -144,11 +145,13 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
|
|||
// return buffers
|
||||
_parser.returnBuffers();
|
||||
_generator.returnBuffers();
|
||||
|
||||
// resuming checking for idle
|
||||
}
|
||||
|
||||
if (_request.isAsyncCompleted() || _request.isAsyncInitial())
|
||||
{
|
||||
_asyncEndp.setCheckForIdle(true);
|
||||
}
|
||||
|
||||
|
||||
// Safety net to catch spinning
|
||||
if (some_progress)
|
||||
_total_no_progress=0;
|
||||
|
|
|
@ -1265,6 +1265,18 @@ public class Request implements HttpServletRequest
|
|||
return _async.isAsyncStarted();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isAsyncInitial()
|
||||
{
|
||||
return _async.isInitial();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isAsyncCompleted()
|
||||
{
|
||||
return _async.isComplete();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isAsyncSupported()
|
||||
{
|
||||
|
|
|
@ -16,16 +16,10 @@ package org.eclipse.jetty.server;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.continuation.Continuation;
|
||||
import org.eclipse.jetty.continuation.ContinuationListener;
|
||||
import org.eclipse.jetty.server.handler.HandlerWrapper;
|
||||
import org.eclipse.jetty.server.session.SessionHandler;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -151,149 +145,8 @@ public class LocalAsyncContextTest
|
|||
return ((LocalConnector)_connector).getResponses(request);
|
||||
}
|
||||
|
||||
private static class SuspendHandler extends HandlerWrapper
|
||||
{
|
||||
private int _read;
|
||||
private long _suspendFor=-1;
|
||||
private long _resumeAfter=-1;
|
||||
private long _completeAfter=-1;
|
||||
|
||||
public SuspendHandler()
|
||||
{
|
||||
}
|
||||
|
||||
public int getRead()
|
||||
{
|
||||
return _read;
|
||||
}
|
||||
|
||||
public void setRead(int read)
|
||||
{
|
||||
_read = read;
|
||||
}
|
||||
|
||||
public long getSuspendFor()
|
||||
{
|
||||
return _suspendFor;
|
||||
}
|
||||
|
||||
public void setSuspendFor(long suspendFor)
|
||||
{
|
||||
_suspendFor = suspendFor;
|
||||
}
|
||||
|
||||
public long getResumeAfter()
|
||||
{
|
||||
return _resumeAfter;
|
||||
}
|
||||
|
||||
public void setResumeAfter(long resumeAfter)
|
||||
{
|
||||
_resumeAfter = resumeAfter;
|
||||
}
|
||||
|
||||
public long getCompleteAfter()
|
||||
{
|
||||
return _completeAfter;
|
||||
}
|
||||
|
||||
public void setCompleteAfter(long completeAfter)
|
||||
{
|
||||
_completeAfter = completeAfter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
if (DispatcherType.REQUEST.equals(baseRequest.getDispatcherType()))
|
||||
{
|
||||
if (_read>0)
|
||||
{
|
||||
byte[] buf=new byte[_read];
|
||||
request.getInputStream().read(buf);
|
||||
}
|
||||
else if (_read<0)
|
||||
{
|
||||
InputStream in = request.getInputStream();
|
||||
int b=in.read();
|
||||
while(b!=-1)
|
||||
b=in.read();
|
||||
}
|
||||
|
||||
final AsyncContext asyncContext = baseRequest.startAsync();
|
||||
asyncContext.addContinuationListener(__asyncListener);
|
||||
if (_suspendFor>0)
|
||||
asyncContext.setTimeout(_suspendFor);
|
||||
|
||||
if (_completeAfter>0)
|
||||
{
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(_completeAfter);
|
||||
response.getOutputStream().print("COMPLETED");
|
||||
response.setStatus(200);
|
||||
baseRequest.setHandled(true);
|
||||
asyncContext.complete();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
else if (_completeAfter==0)
|
||||
{
|
||||
response.getOutputStream().print("COMPLETED");
|
||||
response.setStatus(200);
|
||||
baseRequest.setHandled(true);
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
if (_resumeAfter>0)
|
||||
{
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(_resumeAfter);
|
||||
if(((HttpServletRequest)asyncContext.getRequest()).getSession(true).getId()!=null)
|
||||
asyncContext.dispatch();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
else if (_resumeAfter==0)
|
||||
{
|
||||
asyncContext.dispatch();
|
||||
}
|
||||
}
|
||||
else if (request.getAttribute("TIMEOUT")!=null)
|
||||
{
|
||||
response.setStatus(200);
|
||||
response.getOutputStream().print("TIMEOUT");
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
response.setStatus(200);
|
||||
response.getOutputStream().print("RESUMED");
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ContinuationListener __asyncListener = new ContinuationListener()
|
||||
|
||||
static ContinuationListener __asyncListener = new ContinuationListener()
|
||||
{
|
||||
public void onComplete(Continuation continuation)
|
||||
{
|
||||
|
|
|
@ -13,17 +13,92 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
|
||||
import org.eclipse.jetty.server.nio.SelectChannelConnector;
|
||||
import org.junit.BeforeClass;
|
||||
import org.eclipse.jetty.server.session.SessionHandler;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SelectChannelTimeoutTest extends ConnectorTimeoutTest
|
||||
{
|
||||
@BeforeClass
|
||||
public static void init() throws Exception
|
||||
|
||||
@Before
|
||||
public void init() throws Exception
|
||||
{
|
||||
SelectChannelConnector connector = new SelectChannelConnector();
|
||||
connector.setMaxIdleTime(MAX_IDLE_TIME); //250 msec max idle
|
||||
connector.setMaxIdleTime(MAX_IDLE_TIME); // 250 msec max idle
|
||||
startServer(connector);
|
||||
}
|
||||
|
||||
@Test(expected=SocketException.class)
|
||||
public void testIdleTimeoutAfterSuspend() throws Exception
|
||||
{
|
||||
SuspendHandler _handler = new SuspendHandler();
|
||||
_server.stop();
|
||||
SessionHandler session = new SessionHandler();
|
||||
session.setHandler(_handler);
|
||||
_server.setHandler(session);
|
||||
_server.start();
|
||||
|
||||
_handler.setSuspendFor(100);
|
||||
_handler.setResumeAfter(25);
|
||||
process(null);
|
||||
}
|
||||
|
||||
@Test(expected=SocketException.class)
|
||||
public void testIdleTimeoutAfterTimeout() throws Exception
|
||||
{
|
||||
SuspendHandler _handler = new SuspendHandler();
|
||||
_server.stop();
|
||||
SessionHandler session = new SessionHandler();
|
||||
session.setHandler(_handler);
|
||||
_server.setHandler(session);
|
||||
_server.start();
|
||||
|
||||
_handler.setSuspendFor(50);
|
||||
System.out.println(process(null));
|
||||
}
|
||||
|
||||
@Test(expected=SocketException.class)
|
||||
public void testIdleTimeoutAfterComplete() throws Exception
|
||||
{
|
||||
SuspendHandler _handler = new SuspendHandler();
|
||||
_server.stop();
|
||||
SessionHandler session = new SessionHandler();
|
||||
session.setHandler(_handler);
|
||||
_server.setHandler(session);
|
||||
_server.start();
|
||||
|
||||
_handler.setSuspendFor(100);
|
||||
_handler.setCompleteAfter(25);
|
||||
System.out.println(process(null));
|
||||
}
|
||||
|
||||
// TODO: remove code duplication to LocalAsyncContextTest.java
|
||||
private synchronized String process(String content) throws Exception
|
||||
{
|
||||
String request = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: close\r\n";
|
||||
|
||||
if (content == null)
|
||||
request += "\r\n";
|
||||
else
|
||||
request += "Content-Length: " + content.length() + "\r\n" + "\r\n" + content;
|
||||
return getResponse(request);
|
||||
}
|
||||
|
||||
protected String getResponse(String request) throws Exception
|
||||
{
|
||||
SelectChannelConnector connector = (SelectChannelConnector)_connector;
|
||||
Socket socket = new Socket((String)null,connector.getLocalPort());
|
||||
socket.getOutputStream().write(request.getBytes("UTF-8"));
|
||||
InputStream inputStream = socket.getInputStream();
|
||||
Thread.sleep(500);
|
||||
socket.getOutputStream().write(10);
|
||||
return IO.toString(inputStream);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue