Merge remote-tracking branch 'origin/master' into jetty-9.1

Conflicts:
	jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
	jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java
This commit is contained in:
Greg Wilkins 2013-10-26 11:01:25 +11:00
commit 05cab9bfb4
4 changed files with 167 additions and 22 deletions

View File

@ -355,9 +355,10 @@ public class Server extends HandlerWrapper implements Attributes
futures.add(connector.shutdown()); futures.add(connector.shutdown());
// Then tell the contexts that we are shutting down // Then tell the contexts that we are shutting down
Handler[] contexts = getChildHandlersByClass(Graceful.class);
for (Handler context : contexts) Handler[] gracefuls = getChildHandlersByClass(Graceful.class);
futures.add(((Graceful)context).shutdown()); for (Handler graceful : gracefuls)
futures.add(((Graceful)graceful).shutdown());
// Shall we gracefully wait for zero connections? // Shall we gracefully wait for zero connections?
long stopTimeout = getStopTimeout(); long stopTimeout = getStopTimeout();

View File

@ -19,8 +19,11 @@
package org.eclipse.jetty.server.handler; package org.eclipse.jetty.server.handler;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncEvent; import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener; import javax.servlet.AsyncListener;
@ -32,14 +35,16 @@ import org.eclipse.jetty.server.AsyncContextEvent;
import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.statistic.CounterStatistic; import org.eclipse.jetty.util.statistic.CounterStatistic;
import org.eclipse.jetty.util.statistic.SampleStatistic; import org.eclipse.jetty.util.statistic.SampleStatistic;
@ManagedObject("Request Statistics Gathering") @ManagedObject("Request Statistics Gathering")
public class StatisticsHandler extends HandlerWrapper public class StatisticsHandler extends HandlerWrapper implements Graceful
{ {
private final AtomicLong _statsStartedAt = new AtomicLong(); private final AtomicLong _statsStartedAt = new AtomicLong();
@ -59,6 +64,8 @@ public class StatisticsHandler extends HandlerWrapper
private final AtomicInteger _responses5xx = new AtomicInteger(); private final AtomicInteger _responses5xx = new AtomicInteger();
private final AtomicLong _responsesTotalBytes = new AtomicLong(); private final AtomicLong _responsesTotalBytes = new AtomicLong();
private final AtomicReference<FutureCallback> _shutdown=new AtomicReference<>();
private final AsyncListener _onCompletion = new AsyncListener() private final AsyncListener _onCompletion = new AsyncListener()
{ {
@Override @Override
@ -86,14 +93,21 @@ public class StatisticsHandler extends HandlerWrapper
Request request = state.getBaseRequest(); Request request = state.getBaseRequest();
final long elapsed = System.currentTimeMillis()-request.getTimeStamp(); final long elapsed = System.currentTimeMillis()-request.getTimeStamp();
_requestStats.decrement(); long d=_requestStats.decrement();
_requestTimeStats.set(elapsed); _requestTimeStats.set(elapsed);
updateResponse(request); updateResponse(request);
_asyncWaitStats.decrement(); _asyncWaitStats.decrement();
}
// If we have no more dispatches, should we signal shutdown?
if (d==0)
{
FutureCallback shutdown = _shutdown.get();
if (shutdown!=null)
shutdown.succeeded();
}
}
}; };
/** /**
@ -162,9 +176,18 @@ public class StatisticsHandler extends HandlerWrapper
} }
else if (state.isInitial()) else if (state.isInitial())
{ {
_requestStats.decrement(); long d=_requestStats.decrement();
_requestTimeStats.set(dispatched); _requestTimeStats.set(dispatched);
updateResponse(request); updateResponse(request);
// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown!=null)
{
httpResponse.flushBuffer();
if (d==0)
shutdown.succeeded();
}
} }
// else onCompletion will handle it. // else onCompletion will handle it.
} }
@ -205,10 +228,21 @@ public class StatisticsHandler extends HandlerWrapper
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
_shutdown.set(null);
super.doStart(); super.doStart();
statsReset(); statsReset();
} }
@Override
protected void doStop() throws Exception
{
super.doStop();
FutureCallback shutdown = _shutdown.get();
if (shutdown!=null && !shutdown.isDone())
shutdown.failed(new TimeoutException());
}
/** /**
* @return the number of requests handled by this handler * @return the number of requests handled by this handler
* since {@link #statsReset()} was last called, excluding * since {@link #statsReset()} was last called, excluding
@ -523,4 +557,15 @@ public class StatisticsHandler extends HandlerWrapper
return sb.toString(); return sb.toString();
} }
@Override
public Future<Void> shutdown()
{
FutureCallback shutdown=new FutureCallback(false);
_shutdown.compareAndSet(null,shutdown);
shutdown=_shutdown.get();
if (_dispatchedStats.getCurrent()==0)
shutdown.succeeded();
return shutdown;
}
} }

View File

@ -0,0 +1,105 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class GracefulStopTest
{
private Server server;
@Before
public void setup() throws Exception
{
server = new Server(0);
StatisticsHandler stats = new StatisticsHandler();
TestHandler test=new TestHandler();
server.setHandler(stats);
stats.setHandler(test);
server.setStopTimeout(10 * 1000);
server.start();
}
@Test
public void testGraceful() throws Exception
{
new Thread()
{
@Override
public void run()
{
try
{
TimeUnit.SECONDS.sleep(1);
server.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}.start();
try(Socket socket = new Socket("localhost",server.getBean(NetworkConnector.class).getLocalPort());)
{
socket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StringUtil.__ISO_8859_1_CHARSET));
String out = IO.toString(socket.getInputStream());
Assert.assertThat(out,Matchers.containsString("200 OK"));
}
}
private static class TestHandler extends AbstractHandler
{
@Override
public void handle(final String s, final Request request, final HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse)
throws IOException, ServletException
{
try
{
TimeUnit.SECONDS.sleep(2);
}
catch (InterruptedException e)
{
}
httpServletResponse.getWriter().write("OK");
httpServletResponse.setStatus(200);
request.setHandled(true);
}
}
}

View File

@ -55,37 +55,31 @@ public class CounterStatistic
/** /**
* @param delta the amount to add to the count * @param delta the amount to add to the count
*/ */
public void add(final long delta) public long add(final long delta)
{ {
long value=_curr.addAndGet(delta); long value=_curr.addAndGet(delta);
if (delta > 0) if (delta > 0)
{
_total.addAndGet(delta); _total.addAndGet(delta);
Atomics.updateMax(_max,value); Atomics.updateMax(_max,value);
} }
return value;
/* ------------------------------------------------------------ */
/**
* @param delta the amount to subtract the count by.
*/
public void subtract(final long delta)
{
add(-delta);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
*/ */
public void increment() public long increment()
{ {
add(1); return add(1);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
*/ */
public void decrement() public long decrement()
{ {
add(-1); return add(-1);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */