Improved graceful shutdown and added tests
This commit is contained in:
parent
2d0bedd9bf
commit
04062a8383
|
@ -31,9 +31,12 @@ import java.util.Set;
|
|||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.LocalConnector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.server.handler.ResourceHandler;
|
||||
import org.eclipse.jetty.server.session.SessionHandler;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
|
@ -79,7 +82,10 @@ public class AliasedConstraintTest
|
|||
|
||||
context.setContextPath("/ctx");
|
||||
context.setResourceBase(MavenTestingUtils.getTestResourceDir("docroot").getAbsolutePath());
|
||||
server.setHandler(context);
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.setHandlers(new Handler[]{context,new DefaultHandler()});
|
||||
server.setHandler(handlers);
|
||||
context.setHandler(session);
|
||||
// context.addAliasCheck(new AllowSymLinkAliasChecker());
|
||||
|
||||
|
|
|
@ -276,7 +276,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
// The loop is controlled by the call to async.unhandle in the
|
||||
// finally block below. Unhandle will return false only if an async dispatch has
|
||||
// already happened when unhandle is called.
|
||||
loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && getServer().isRunning())
|
||||
loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && !getServer().isStopped())
|
||||
{
|
||||
boolean error=false;
|
||||
try
|
||||
|
@ -376,7 +376,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
catch (Exception e)
|
||||
{
|
||||
error=true;
|
||||
if (_connector.isStarted())
|
||||
LOG.warn(String.valueOf(_request.getHttpURI()), e);
|
||||
else
|
||||
LOG.debug(String.valueOf(_request.getHttpURI()), e);
|
||||
_state.error(e);
|
||||
_request.setHandled(true);
|
||||
handleException(e);
|
||||
|
@ -388,6 +391,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
else
|
||||
{
|
||||
error=true;
|
||||
if (_connector.isStarted())
|
||||
LOG.warn(String.valueOf(_request.getHttpURI()), e);
|
||||
else
|
||||
LOG.debug(String.valueOf(_request.getHttpURI()), e);
|
||||
LOG.warn(String.valueOf(_request.getHttpURI()), e);
|
||||
_state.error(e);
|
||||
_request.setHandled(true);
|
||||
|
|
|
@ -412,6 +412,9 @@ public class Server extends HandlerWrapper implements Attributes
|
|||
if (isDumpBeforeStop())
|
||||
dumpStdErr();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("doStop {}",this);
|
||||
|
||||
MultiException mex=new MultiException();
|
||||
|
||||
// list if graceful futures
|
||||
|
@ -422,7 +425,6 @@ public class Server extends HandlerWrapper implements Attributes
|
|||
futures.add(connector.shutdown());
|
||||
|
||||
// Then tell the contexts that we are shutting down
|
||||
|
||||
Handler[] gracefuls = getChildHandlersByClass(Graceful.class);
|
||||
for (Handler graceful : gracefuls)
|
||||
futures.add(((Graceful)graceful).shutdown());
|
||||
|
|
|
@ -25,6 +25,7 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
@ -113,12 +114,10 @@ public class HandlerWrapper extends AbstractHandlerContainer
|
|||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
if (_handler!=null && isStarted())
|
||||
{
|
||||
_handler.handle(target,baseRequest, request, response);
|
||||
Handler handler=_handler;
|
||||
if (handler!=null)
|
||||
handler.handle(target,baseRequest, request, response);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
|
|
|
@ -198,8 +198,10 @@ public class ShutdownHandler extends HandlerWrapper
|
|||
doShutdown(baseRequest, response);
|
||||
}
|
||||
|
||||
protected void doShutdown(Request baseRequest, HttpServletResponse response) throws IOException {
|
||||
for (Connector connector : getServer().getConnectors()) {
|
||||
protected void doShutdown(Request baseRequest, HttpServletResponse response) throws IOException
|
||||
{
|
||||
for (Connector connector : getServer().getConnectors())
|
||||
{
|
||||
connector.shutdown();
|
||||
}
|
||||
|
||||
|
|
|
@ -30,8 +30,11 @@ import javax.servlet.AsyncListener;
|
|||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.sql.rowset.BaseRowSet;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.AsyncContextEvent;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpChannelState;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
|
@ -135,17 +138,17 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||
public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
_dispatchedStats.increment();
|
||||
|
||||
final long start;
|
||||
HttpChannelState state = request.getHttpChannelState();
|
||||
HttpChannelState state = baseRequest.getHttpChannelState();
|
||||
if (state.isInitial())
|
||||
{
|
||||
// new request
|
||||
_requestStats.increment();
|
||||
start = request.getTimeStamp();
|
||||
start = baseRequest.getTimeStamp();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -156,7 +159,14 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
|
||||
try
|
||||
{
|
||||
super.handle(path, request, httpRequest, httpResponse);
|
||||
Handler handler = getHandler();
|
||||
if (handler!=null && _shutdown.get()==null && isStarted())
|
||||
handler.handle(path, baseRequest, request, response);
|
||||
else
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -178,13 +188,13 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
{
|
||||
long d=_requestStats.decrement();
|
||||
_requestTimeStats.set(dispatched);
|
||||
updateResponse(request);
|
||||
updateResponse(baseRequest);
|
||||
|
||||
// If we have no more dispatches, should we signal shutdown?
|
||||
FutureCallback shutdown = _shutdown.get();
|
||||
if (shutdown!=null)
|
||||
{
|
||||
httpResponse.flushBuffer();
|
||||
response.flushBuffer();
|
||||
if (d==0)
|
||||
shutdown.succeeded();
|
||||
}
|
||||
|
|
|
@ -18,10 +18,22 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -32,109 +44,253 @@ import org.eclipse.jetty.server.handler.StatisticsHandler;
|
|||
import org.eclipse.jetty.util.IO;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class GracefulStopTest
|
||||
{
|
||||
private Server server;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception
|
||||
{
|
||||
server = new Server(0)
|
||||
/**
|
||||
* Test of standard graceful timeout mechanism when a block request does
|
||||
* not complete
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testGracefulNoWaiter() throws Exception
|
||||
{
|
||||
Server server= new Server();
|
||||
server.setStopTimeout(1000);
|
||||
|
||||
};
|
||||
StatisticsHandler stats = new StatisticsHandler();
|
||||
TestHandler test=new TestHandler();
|
||||
server.setHandler(stats);
|
||||
stats.setHandler(test);
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
TestHandler handler = new TestHandler();
|
||||
server.setHandler(handler);
|
||||
|
||||
server.start();
|
||||
final int port=connector.getLocalPort();
|
||||
Socket client = new Socket("127.0.0.1", port);
|
||||
client.getOutputStream().write((
|
||||
"POST / HTTP/1.0\r\n"+
|
||||
"Host: localhost:"+port+"\r\n" +
|
||||
"Content-Type: plain/text\r\n" +
|
||||
"Content-Length: 10\r\n" +
|
||||
"\r\n"+
|
||||
"12345"
|
||||
).getBytes());
|
||||
client.getOutputStream().flush();
|
||||
handler.latch.await();
|
||||
|
||||
long start = System.nanoTime();
|
||||
server.stop();
|
||||
long stop = System.nanoTime();
|
||||
|
||||
// No Graceful waiters
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(stop-start),lessThan(900L));
|
||||
|
||||
assertThat(client.getInputStream().read(),Matchers.is(-1));
|
||||
|
||||
assertThat(handler.handling.get(),Matchers.is(false));
|
||||
assertThat(handler.thrown.get(),instanceOf(ClosedChannelException.class));
|
||||
|
||||
client.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test of standard graceful timeout mechanism when a block request does
|
||||
* not complete
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testGraceful() throws Exception
|
||||
public void testGracefulTimeout() throws Exception
|
||||
{
|
||||
server.setStopTimeout(10 * 1000);
|
||||
Server server= new Server();
|
||||
server.setStopTimeout(1000);
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
TestHandler handler = new TestHandler();
|
||||
StatisticsHandler stats = new StatisticsHandler();
|
||||
server.setHandler(stats);
|
||||
stats.setHandler(handler);
|
||||
|
||||
server.start();
|
||||
final int port=connector.getLocalPort();
|
||||
Socket client = new Socket("127.0.0.1", port);
|
||||
client.getOutputStream().write((
|
||||
"POST / HTTP/1.0\r\n"+
|
||||
"Host: localhost:"+port+"\r\n" +
|
||||
"Content-Type: plain/text\r\n" +
|
||||
"Content-Length: 10\r\n" +
|
||||
"\r\n"+
|
||||
"12345"
|
||||
).getBytes());
|
||||
client.getOutputStream().flush();
|
||||
handler.latch.await();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try
|
||||
{
|
||||
server.stop();
|
||||
Assert.fail();
|
||||
}
|
||||
catch(TimeoutException e)
|
||||
{
|
||||
long stop = System.nanoTime();
|
||||
// No Graceful waiters
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(stop-start),greaterThan(900L));
|
||||
}
|
||||
|
||||
assertThat(client.getInputStream().read(),Matchers.is(-1));
|
||||
|
||||
assertThat(handler.handling.get(),Matchers.is(false));
|
||||
assertThat(handler.thrown.get(),instanceOf(ClosedChannelException.class));
|
||||
|
||||
client.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test of standard graceful timeout mechanism when a block request does
|
||||
* complete. Note that even though the request completes after 100ms, the
|
||||
* stop always takes 1000ms
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testGracefulComplete() throws Exception
|
||||
{
|
||||
Server server= new Server();
|
||||
server.setStopTimeout(10000);
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
TestHandler handler = new TestHandler();
|
||||
StatisticsHandler stats = new StatisticsHandler();
|
||||
server.setHandler(stats);
|
||||
stats.setHandler(handler);
|
||||
|
||||
server.start();
|
||||
final int port=connector.getLocalPort();
|
||||
|
||||
try(final Socket client1 = new Socket("127.0.0.1", port);final Socket client2 = new Socket("127.0.0.1", port);)
|
||||
{
|
||||
client1.getOutputStream().write((
|
||||
"POST / HTTP/1.0\r\n"+
|
||||
"Host: localhost:"+port+"\r\n" +
|
||||
"Content-Type: plain/text\r\n" +
|
||||
"Content-Length: 10\r\n" +
|
||||
"\r\n"+
|
||||
"12345"
|
||||
).getBytes());
|
||||
client1.getOutputStream().flush();
|
||||
handler.latch.await();
|
||||
|
||||
new Thread()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long end = now+500;
|
||||
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
server.stop();
|
||||
Thread.sleep(100);
|
||||
|
||||
// Try creating a new connection
|
||||
try
|
||||
{
|
||||
new Socket("127.0.0.1", port);
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
catch (Exception e)
|
||||
catch(ConnectException e)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
// Try another request on existing connection
|
||||
|
||||
client2.getOutputStream().write((
|
||||
"GET / HTTP/1.0\r\n"+
|
||||
"Host: localhost:"+port+"\r\n" +
|
||||
"\r\n"
|
||||
).getBytes());
|
||||
client2.getOutputStream().flush();
|
||||
String response2 = IO.toString(client2.getInputStream());
|
||||
assertThat(response2, containsString(" 503 "));
|
||||
|
||||
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
Thread.sleep(end-now);
|
||||
client1.getOutputStream().write("567890".getBytes());
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
|
||||
try(Socket socket = new Socket("localhost",server.getBean(NetworkConnector.class).getLocalPort());)
|
||||
{
|
||||
socket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
|
||||
String out = IO.toString(socket.getInputStream());
|
||||
Assert.assertThat(out,Matchers.containsString("200 OK"));
|
||||
}
|
||||
|
||||
server.join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGracefulTimeout() throws Exception
|
||||
{
|
||||
server.setStopTimeout(100);
|
||||
server.start();
|
||||
|
||||
new Thread()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
long start = System.nanoTime();
|
||||
server.stop();
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
//e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
long stop = System.nanoTime();
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(stop-start),greaterThan(500L));
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(stop-start),lessThan(10000L));
|
||||
|
||||
try(Socket socket = new Socket("localhost",server.getBean(NetworkConnector.class).getLocalPort());)
|
||||
{
|
||||
socket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
|
||||
String out = IO.toString(socket.getInputStream());
|
||||
Assert.assertEquals("",out);
|
||||
String response = IO.toString(client1.getInputStream());
|
||||
|
||||
assertThat(handler.handling.get(),Matchers.is(false));
|
||||
assertThat(response, containsString(" 200 OK"));
|
||||
assertThat(response, containsString("read 10/10"));
|
||||
|
||||
assertThat(stats.getRequests(),Matchers.is(2));
|
||||
assertThat(stats.getResponses5xx(),Matchers.is(1));
|
||||
}
|
||||
}
|
||||
|
||||
server.join();
|
||||
}
|
||||
|
||||
private static class TestHandler extends AbstractHandler
|
||||
static class TestHandler extends AbstractHandler
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
|
||||
final AtomicBoolean handling = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void handle(final String s, final Request request, final HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse)
|
||||
public void handle(String target, Request baseRequest,
|
||||
HttpServletRequest request, HttpServletResponse response)
|
||||
throws IOException, ServletException
|
||||
{
|
||||
handling.set(true);
|
||||
latch.countDown();
|
||||
int c=0;
|
||||
try
|
||||
{
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
int content_length = request.getContentLength();
|
||||
InputStream in = request.getInputStream();
|
||||
|
||||
while(true)
|
||||
{
|
||||
if (in.read()<0)
|
||||
break;
|
||||
c++;
|
||||
}
|
||||
|
||||
httpServletResponse.getWriter().write("OK");
|
||||
httpServletResponse.setStatus(200);
|
||||
request.setHandled(true);
|
||||
baseRequest.setHandled(true);
|
||||
response.setStatus(200);
|
||||
response.getWriter().printf("read %d/%d%n",c,content_length);
|
||||
}
|
||||
catch(Throwable th)
|
||||
{
|
||||
thrown.set(th);
|
||||
}
|
||||
finally
|
||||
{
|
||||
handling.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletRequest;
|
|||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -82,6 +83,9 @@ public class ScopedHandlerTest
|
|||
@Test
|
||||
public void testDouble() throws Exception
|
||||
{
|
||||
Request request = new Request(null,null);
|
||||
Response response = new Response(null,null);
|
||||
|
||||
TestHandler handler0 = new TestHandler("0");
|
||||
OtherHandler handlerA = new OtherHandler("A");
|
||||
TestHandler handler1 = new TestHandler("1");
|
||||
|
@ -90,7 +94,7 @@ public class ScopedHandlerTest
|
|||
handlerA.setHandler(handler1);
|
||||
handler1.setHandler(handlerB);
|
||||
handler0.start();
|
||||
handler0.handle("target",null,null,null);
|
||||
handler0.handle("target",request,request,response);
|
||||
handler0.stop();
|
||||
String history=_history.toString();
|
||||
assertEquals(">S0>S1>W0>HA>W1>HB<HB<W1<HA<W0<S1<S0",history);
|
||||
|
@ -99,6 +103,9 @@ public class ScopedHandlerTest
|
|||
@Test
|
||||
public void testTriple() throws Exception
|
||||
{
|
||||
Request request = new Request(null,null);
|
||||
Response response = new Response(null,null);
|
||||
|
||||
TestHandler handler0 = new TestHandler("0");
|
||||
OtherHandler handlerA = new OtherHandler("A");
|
||||
TestHandler handler1 = new TestHandler("1");
|
||||
|
@ -111,7 +118,7 @@ public class ScopedHandlerTest
|
|||
handlerB.setHandler(handler2);
|
||||
handler2.setHandler(handlerC);
|
||||
handler0.start();
|
||||
handler0.handle("target",null,null,null);
|
||||
handler0.handle("target",request,request,response);
|
||||
handler0.stop();
|
||||
String history=_history.toString();
|
||||
assertEquals(">S0>S1>S2>W0>HA>W1>HB>W2>HC<HC<W2<HB<W1<HA<W0<S2<S1<S0",history);
|
||||
|
|
|
@ -143,7 +143,7 @@ public class AsyncServletTest
|
|||
{
|
||||
_expectedCode="500 ";
|
||||
String response=process("suspend=200",null);
|
||||
assertEquals("HTTP/1.1 500 Async Timeout",response.substring(0,26));
|
||||
Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 500 Async Timeout"));
|
||||
assertContains(
|
||||
"history: REQUEST /ctx/path/info\r\n"+
|
||||
"history: initial\r\n"+
|
||||
|
|
Loading…
Reference in New Issue