From 28566c72c8bb091b272fd7b25fb7b4389363b024 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 24 Oct 2013 17:22:08 +1100 Subject: [PATCH 1/2] 420142 reimplemented graceful shutdown --- .../java/org/eclipse/jetty/server/Server.java | 7 +- .../server/handler/StatisticsHandler.java | 54 ++++++++- .../jetty/server/GracefulStopTest.java | 105 ++++++++++++++++++ .../util/statistic/CounterStatistic.java | 24 ++-- 4 files changed, 167 insertions(+), 23 deletions(-) create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 31a67de487d..0fbc89f6f56 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -359,9 +359,10 @@ public class Server extends HandlerWrapper implements Attributes futures.add(connector.shutdown()); // Then tell the contexts that we are shutting down - Handler[] contexts = getChildHandlersByClass(Graceful.class); - for (Handler context : contexts) - futures.add(((Graceful)context).shutdown()); + + Handler[] gracefuls = getChildHandlersByClass(Graceful.class); + for (Handler graceful : gracefuls) + futures.add(((Graceful)graceful).shutdown()); // Shall we gracefully wait for zero connections? long stopTimeout = getStopTimeout(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java index 78e8eb71ef3..bc93a6501cb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java @@ -19,8 +19,11 @@ package org.eclipse.jetty.server.handler; import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; @@ -32,14 +35,16 @@ import org.eclipse.jetty.server.AsyncContextEvent; import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.statistic.CounterStatistic; import org.eclipse.jetty.util.statistic.SampleStatistic; @ManagedObject("Request Statistics Gathering") -public class StatisticsHandler extends HandlerWrapper +public class StatisticsHandler extends HandlerWrapper implements Graceful { private final AtomicLong _statsStartedAt = new AtomicLong(); @@ -59,6 +64,8 @@ public class StatisticsHandler extends HandlerWrapper private final AtomicInteger _responses5xx = new AtomicInteger(); private final AtomicLong _responsesTotalBytes = new AtomicLong(); + private final AtomicReference _shutdown=new AtomicReference<>(); + private final AsyncListener _onCompletion = new AsyncListener() { @Override @@ -81,21 +88,27 @@ public class StatisticsHandler extends HandlerWrapper @Override public void onComplete(AsyncEvent event) throws IOException { - HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState(); Request request = state.getBaseRequest(); final long elapsed = System.currentTimeMillis()-request.getTimeStamp(); - _requestStats.decrement(); + long d=_requestStats.decrement(); _requestTimeStats.set(elapsed); updateResponse(request); if (!state.isDispatched()) _asyncWaitStats.decrement(); + + // If we have no more dispatches, should we signal shutdown? + if (d==0) + { + FutureCallback shutdown = _shutdown.get(); + if (shutdown!=null) + shutdown.succeeded(); + } } - }; /** @@ -164,9 +177,18 @@ public class StatisticsHandler extends HandlerWrapper } else if (state.isInitial()) { - _requestStats.decrement(); + long d=_requestStats.decrement(); _requestTimeStats.set(dispatched); updateResponse(request); + + // If we have no more dispatches, should we signal shutdown? + FutureCallback shutdown = _shutdown.get(); + if (shutdown!=null) + { + httpResponse.flushBuffer(); + if (d==0) + shutdown.succeeded(); + } } // else onCompletion will handle it. } @@ -207,9 +229,20 @@ public class StatisticsHandler extends HandlerWrapper @Override protected void doStart() throws Exception { + _shutdown.set(null); super.doStart(); statsReset(); } + + + @Override + protected void doStop() throws Exception + { + super.doStop(); + FutureCallback shutdown = _shutdown.get(); + if (shutdown!=null && !shutdown.isDone()) + shutdown.failed(new TimeoutException()); + } /** * @return the number of requests handled by this handler @@ -525,4 +558,15 @@ public class StatisticsHandler extends HandlerWrapper return sb.toString(); } + + @Override + public Future shutdown() + { + FutureCallback shutdown=new FutureCallback(false); + _shutdown.compareAndSet(null,shutdown); + shutdown=_shutdown.get(); + if (_dispatchedStats.getCurrent()==0) + shutdown.succeeded(); + return shutdown; + } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java new file mode 100644 index 00000000000..0c11502ff68 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java @@ -0,0 +1,105 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.TimeUnit; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.StringUtil; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GracefulStopTest +{ + private Server server; + + @Before + public void setup() throws Exception + { + server = new Server(0); + StatisticsHandler stats = new StatisticsHandler(); + TestHandler test=new TestHandler(); + server.setHandler(stats); + stats.setHandler(test); + server.setStopTimeout(10 * 1000); + + server.start(); + } + + @Test + public void testGraceful() throws Exception + { + new Thread() + { + @Override + public void run() + { + try + { + TimeUnit.SECONDS.sleep(1); + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }.start(); + + try(Socket socket = new Socket("localhost",server.getBean(NetworkConnector.class).getLocalPort());) + { + socket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StringUtil.__ISO_8859_1_CHARSET)); + String out = IO.toString(socket.getInputStream()); + Assert.assertThat(out,Matchers.containsString("200 OK")); + } + } + + private static class TestHandler extends AbstractHandler + { + @Override + public void handle(final String s, final Request request, final HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse) + throws IOException, ServletException + { + try + { + TimeUnit.SECONDS.sleep(2); + } + catch (InterruptedException e) + { + } + + httpServletResponse.getWriter().write("OK"); + httpServletResponse.setStatus(200); + request.setHandled(true); + } + } + +} \ No newline at end of file diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/statistic/CounterStatistic.java b/jetty-util/src/main/java/org/eclipse/jetty/util/statistic/CounterStatistic.java index 67699911c9b..51a82ea3bb4 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/statistic/CounterStatistic.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/statistic/CounterStatistic.java @@ -55,37 +55,31 @@ public class CounterStatistic /** * @param delta the amount to add to the count */ - public void add(final long delta) + public long add(final long delta) { long value=_curr.addAndGet(delta); if (delta > 0) + { _total.addAndGet(delta); - Atomics.updateMax(_max,value); - } - - /* ------------------------------------------------------------ */ - /** - * @param delta the amount to subtract the count by. - */ - public void subtract(final long delta) - { - add(-delta); + Atomics.updateMax(_max,value); + } + return value; } /* ------------------------------------------------------------ */ /** */ - public void increment() + public long increment() { - add(1); + return add(1); } /* ------------------------------------------------------------ */ /** */ - public void decrement() + public long decrement() { - add(-1); + return add(-1); } /* ------------------------------------------------------------ */ From a28e4730ad48012b367900ee5a8b40e95462dde2 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 25 Oct 2013 14:08:55 +0200 Subject: [PATCH 2/2] 420362 - Response/request listeners called too many times. Wrapped on[Request|Response]XXX(XXXListener) listeners into their specific interface so that they don't get notified multiple times. --- .../org/eclipse/jetty/client/HttpRequest.java | 145 ++++++++++++++---- .../org/eclipse/jetty/client/HttpSender.java | 2 +- .../eclipse/jetty/client/HttpClientTest.java | 144 +++++++++++++++++ 3 files changed, 264 insertions(+), 27 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index 3ea2e0e6c22..0986a6af13f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -23,6 +23,7 @@ import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URLDecoder; import java.net.URLEncoder; +import java.nio.ByteBuffer; import java.nio.charset.UnsupportedCharsetException; import java.nio.file.Path; import java.util.ArrayList; @@ -43,6 +44,7 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.client.util.PathContentProvider; +import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; @@ -295,93 +297,184 @@ public class HttpRequest implements Request } @Override - public Request onRequestQueued(QueuedListener listener) + public Request onRequestQueued(final QueuedListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new QueuedListener() + { + @Override + public void onQueued(Request request) + { + listener.onQueued(request); + } + }); return this; } @Override - public Request onRequestBegin(BeginListener listener) + public Request onRequestBegin(final BeginListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new BeginListener() + { + @Override + public void onBegin(Request request) + { + listener.onBegin(request); + } + }); return this; } @Override - public Request onRequestHeaders(HeadersListener listener) + public Request onRequestHeaders(final HeadersListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new HeadersListener() + { + @Override + public void onHeaders(Request request) + { + listener.onHeaders(request); + } + }); return this; } @Override - public Request onRequestCommit(CommitListener listener) + public Request onRequestCommit(final CommitListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new CommitListener() + { + @Override + public void onCommit(Request request) + { + listener.onCommit(request); + } + }); return this; } @Override - public Request onRequestContent(ContentListener listener) + public Request onRequestContent(final ContentListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new ContentListener() + { + @Override + public void onContent(Request request, ByteBuffer content) + { + listener.onContent(request, content); + } + }); return this; } @Override - public Request onRequestSuccess(SuccessListener listener) + public Request onRequestSuccess(final SuccessListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new SuccessListener() + { + @Override + public void onSuccess(Request request) + { + listener.onSuccess(request); + } + }); return this; } @Override - public Request onRequestFailure(FailureListener listener) + public Request onRequestFailure(final FailureListener listener) { - this.requestListeners.add(listener); + this.requestListeners.add(new FailureListener() + { + @Override + public void onFailure(Request request, Throwable failure) + { + listener.onFailure(request, failure); + } + }); return this; } @Override - public Request onResponseBegin(Response.BeginListener listener) + public Request onResponseBegin(final Response.BeginListener listener) { - this.responseListeners.add(listener); + this.responseListeners.add(new Response.BeginListener() + { + @Override + public void onBegin(Response response) + { + listener.onBegin(response); + } + }); return this; } @Override - public Request onResponseHeader(Response.HeaderListener listener) + public Request onResponseHeader(final Response.HeaderListener listener) { - this.responseListeners.add(listener); + this.responseListeners.add(new Response.HeaderListener() + { + @Override + public boolean onHeader(Response response, HttpField field) + { + return listener.onHeader(response, field); + } + }); return this; } @Override - public Request onResponseHeaders(Response.HeadersListener listener) + public Request onResponseHeaders(final Response.HeadersListener listener) { - this.responseListeners.add(listener); + this.responseListeners.add(new Response.HeadersListener() + { + @Override + public void onHeaders(Response response) + { + listener.onHeaders(response); + } + }); return this; } @Override - public Request onResponseContent(Response.ContentListener listener) + public Request onResponseContent(final Response.ContentListener listener) { - this.responseListeners.add(listener); + this.responseListeners.add(new Response.ContentListener() + { + @Override + public void onContent(Response response, ByteBuffer content) + { + listener.onContent(response, content); + } + }); return this; } @Override - public Request onResponseSuccess(Response.SuccessListener listener) + public Request onResponseSuccess(final Response.SuccessListener listener) { - this.responseListeners.add(listener); + this.responseListeners.add(new Response.SuccessListener() + { + @Override + public void onSuccess(Response response) + { + listener.onSuccess(response); + } + }); return this; } @Override - public Request onResponseFailure(Response.FailureListener listener) + public Request onResponseFailure(final Response.FailureListener listener) { - this.responseListeners.add(listener); + this.responseListeners.add(new Response.FailureListener() + { + @Override + public void onFailure(Response response, Throwable failure) + { + listener.onFailure(response, failure); + } + }); return this; } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index eff0576761c..4d607b803e4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -360,7 +360,7 @@ public class HttpSender implements AsyncContentProvider.Listener if (!commit(request)) return false; - if (content != null) + if (content != null && content.hasRemaining()) { RequestNotifier notifier = connection.getDestination().getRequestNotifier(); notifier.notifyContent(request, content); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 9d80ad3b722..9fc6a81630f 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -860,4 +860,148 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.getStatus()); } + + @Test + public void testRequestListenerForMultipleEventsIsInvokedOncePerEvent() throws Exception + { + start(new EmptyServerHandler()); + + final AtomicInteger counter = new AtomicInteger(); + Request.Listener listener = new Request.Listener() + { + @Override + public void onQueued(Request request) + { + counter.incrementAndGet(); + } + + @Override + public void onBegin(Request request) + { + counter.incrementAndGet(); + } + + @Override + public void onHeaders(Request request) + { + counter.incrementAndGet(); + } + + @Override + public void onCommit(Request request) + { + counter.incrementAndGet(); + } + + @Override + public void onContent(Request request, ByteBuffer content) + { + // Should not be invoked + counter.incrementAndGet(); + } + + @Override + public void onFailure(Request request, Throwable failure) + { + // Should not be invoked + counter.incrementAndGet(); + } + + @Override + public void onSuccess(Request request) + { + counter.incrementAndGet(); + } + }; + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onRequestQueued(listener) + .onRequestBegin(listener) + .onRequestHeaders(listener) + .onRequestCommit(listener) + .onRequestContent(listener) + .onRequestSuccess(listener) + .onRequestFailure(listener) + .listener(listener) + .send(); + + Assert.assertEquals(200, response.getStatus()); + int expectedEventsTriggeredByOnRequestXXXListeners = 5; + int expectedEventsTriggeredByListener = 5; + int expected = expectedEventsTriggeredByOnRequestXXXListeners + expectedEventsTriggeredByListener; + Assert.assertEquals(expected, counter.get()); + } + + @Test + public void testResponseListenerForMultipleEventsIsInvokedOncePerEvent() throws Exception + { + start(new EmptyServerHandler()); + + final AtomicInteger counter = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + Response.Listener listener = new Response.Listener() + { + @Override + public void onBegin(Response response) + { + counter.incrementAndGet(); + } + + @Override + public boolean onHeader(Response response, HttpField field) + { + // Number of header may vary, so don't count + return true; + } + + @Override + public void onHeaders(Response response) + { + counter.incrementAndGet(); + } + + @Override + public void onContent(Response response, ByteBuffer content) + { + // Should not be invoked + counter.incrementAndGet(); + } + + @Override + public void onSuccess(Response response) + { + counter.incrementAndGet(); + } + + @Override + public void onFailure(Response response, Throwable failure) + { + // Should not be invoked + counter.incrementAndGet(); + } + + @Override + public void onComplete(Result result) + { + Assert.assertEquals(200, result.getResponse().getStatus()); + counter.incrementAndGet(); + latch.countDown(); + } + }; + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onResponseBegin(listener) + .onResponseHeader(listener) + .onResponseHeaders(listener) + .onResponseContent(listener) + .onResponseSuccess(listener) + .onResponseFailure(listener) + .send(listener); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + int expectedEventsTriggeredByOnResponseXXXListeners = 3; + int expectedEventsTriggeredByCompletionListener = 4; + int expected = expectedEventsTriggeredByOnResponseXXXListeners + expectedEventsTriggeredByCompletionListener; + Assert.assertEquals(expected, counter.get()); + } }