diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index b3780d26b33..78a8171aa4a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; @@ -87,6 +88,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump protected void doStart() throws Exception { super.doStart(); + startSelector(); + } + + protected void startSelector() throws IOException + { _selector = newSelector(); _selectorManager.execute(this); } @@ -96,12 +102,26 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump return Selector.open(); } + public Selector getSelector() + { + return _selector; + } + + protected void onSelectFailed(Throwable cause) throws IOException + { + LOG.info("Restarting selector: " + toString(), cause); + startSelector(); + } + public int size() { Selector s = _selector; if (s == null) return 0; - return s.keys().size(); + Set keys = s.keys(); + if (keys == null) + return 0; + return keys.size(); } @Override @@ -258,14 +278,17 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { + Selector selector = _selector; if (isRunning()) - LOG.warn(x); + { + notifySelectFailed(x); + } else { LOG.warn(x.toString()); LOG.debug(x); } - closeNoExceptions(_selector); + IO.close(selector); } return false; } @@ -306,13 +329,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump { LOG.debug("Ignoring cancelled key for channel {}", key.channel()); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } catch (Throwable x) { LOG.warn("Could not process key for channel " + key.channel(), x); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } else @@ -321,7 +344,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); Object attachment = key.attachment(); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } return null; @@ -342,6 +365,18 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } + private void notifySelectFailed(Throwable cause) + { + try + { + onSelectFailed(cause); + } + catch (IOException e) + { + LOG.info("Failure while calling onSelectFailed()", e); + } + } + private interface Product extends Runnable { } @@ -400,24 +435,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { - closeNoExceptions(channel); + IO.close(channel); LOG.warn("Accept failed for channel " + channel, x); } } - private void closeNoExceptions(Closeable closeable) - { - try - { - if (closeable != null) - closeable.close(); - } - catch (Throwable x) - { - LOG.ignore(x); - } - } - private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException { EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); @@ -538,7 +560,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { - closeNoExceptions(_channel); + IO.close(_channel); LOG.warn(x); } } @@ -559,7 +581,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump public void close() { LOG.debug("closed accept of {}", channel); - closeNoExceptions(channel); + IO.close(channel); } @Override @@ -572,7 +594,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { - closeNoExceptions(channel); + IO.close(channel); LOG.debug(x); } } @@ -607,12 +629,12 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump public void close() { LOG.debug("closed creation of {}", channel); - closeNoExceptions(channel); + IO.close(channel); } protected void failed(Throwable failure) { - closeNoExceptions(channel); + IO.close(channel); LOG.debug(failure); } } @@ -649,7 +671,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump if (failed.compareAndSet(false, true)) { timeout.cancel(); - closeNoExceptions(channel); + IO.close(channel); ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment); } } @@ -738,7 +760,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump @Override public void run() { - closeNoExceptions(_endPoint.getConnection()); + IO.close(_endPoint.getConnection()); _latch.countDown(); } } @@ -752,7 +774,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump { Selector selector = _selector; _selector = null; - closeNoExceptions(selector); + IO.close(selector); _latch.countDown(); } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java new file mode 100644 index 00000000000..365b90d1d1b --- /dev/null +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java @@ -0,0 +1,410 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.test; + +import java.io.IOException; +import java.nio.channels.Selector; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; +import org.eclipse.jetty.util.thread.strategy.ProduceConsume; +import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class RebuildSelectorTest +{ + @Parameterized.Parameters(name = "{0}") + public static Object[][] params() + { + return new Object[][] + { + {"EPC", new ExecuteProduceConsume.Factory()}, + {"PEC", new ProduceExecuteConsume.Factory()}, + {"PC", new ProduceConsume.Factory()} + }; + } + + private HttpClient client; + private Server server; + private AsyncCloseSelectorServlet asyncCloseSelectorServlet; + + @Parameterized.Parameter(0) + public String testMode; + + @Parameterized.Parameter(1) + public ExecutionStrategy.Factory executionStrategy; + + @After + public void stopServer() throws Exception + { + server.stop(); + } + + @Before + public void startClient() throws Exception + { + client = new HttpClient(); + client.setIdleTimeout(2000); + client.setMaxConnectionsPerDestination(1); + client.start(); + } + + @After + public void stopClient() throws Exception + { + client.stop(); + } + + public void startServer(Function customizeServerConsumer) throws Exception + { + server = new Server(); + + ServerConnector connector = customizeServerConsumer.apply(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + context.addServlet(HelloServlet.class, "/hello"); + + ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector)); + context.addServlet(closeHolder, "/selector/close"); + + asyncCloseSelectorServlet = new AsyncCloseSelectorServlet(connector); + ServletHolder asyncCloseHolder = new ServletHolder(asyncCloseSelectorServlet); + asyncCloseHolder.setAsyncSupported(true); + context.addServlet(asyncCloseHolder, "/selector/async-close"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @Test + public void testRebuildServerSelectorNormal() throws Exception + { + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + CustomServerConnector connector = new CustomServerConnector(server, executionStrategy, failedLatch, 1, 1); + connector.setPort(0); + return connector; + }); + + // Request /hello + assertRequestHello(); + + // Request /selector/close + assertRequestSelectorClose("/selector/close"); + + // Wait for selectors to close from action above + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); + + // Request /hello + assertRequestHello(); + } + + @Test + @Ignore + public void testRebuildServerSelectorAsync() throws Exception + { + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + CustomServerConnector connector = new CustomServerConnector(server, executionStrategy, failedLatch, 1, 1); + connector.setPort(0); + return connector; + }); + + // Request /hello + assertRequestHello(); + + // Request /selector/async-close + assertRequestSelectorClose("/selector/async-close"); + + // Wait for selectors to close from action above + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); + + // Ensure that Async Listener onError was called + assertTrue(asyncCloseSelectorServlet.onErrorLatch.await(2, TimeUnit.SECONDS)); + + // Request /hello + assertRequestHello(); + } + + private void assertRequestSelectorClose(String path) throws InterruptedException, ExecutionException, TimeoutException + { + ContentResponse response = client.newRequest(server.getURI().resolve(path)) + .method(HttpMethod.GET) + .header(HttpHeader.CONNECTION, "close") + .send(); + + assertThat("/selector/close status", response.getStatus(), is(HttpStatus.OK_200)); + assertThat("/selector/close response", response.getContentAsString(), startsWith("Closing selectors ")); + } + + private void assertRequestHello() throws InterruptedException, ExecutionException, TimeoutException + { + ContentResponse response = client.newRequest(server.getURI().resolve("/hello")) + .method(HttpMethod.GET) + .header(HttpHeader.CONNECTION, "close") + .send(); + + assertThat("/hello status", response.getStatus(), is(HttpStatus.OK_200)); + assertThat("/hello response", response.getContentAsString(), startsWith("Hello ")); + } + + public static class CustomServerConnector extends ServerConnector + { + private final ExecutionStrategy.Factory strategyFactory; + private final CountDownLatch failedLatch; + + public CustomServerConnector(Server server, ExecutionStrategy.Factory strategyFactory, CountDownLatch failedLatch, int acceptors, int selectors) + { + super(server, acceptors, selectors); + this.strategyFactory = strategyFactory; + this.failedLatch = failedLatch; + } + + @Override + public ExecutionStrategy.Factory getExecutionStrategyFactory() + { + return this.strategyFactory; + } + + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new CustomManagedSelector(this, id, failedLatch, getExecutionStrategyFactory()); + } + }; + } + } + + public static class CustomManagedSelector extends ManagedSelector + { + private static final Logger LOG = Log.getLogger(CustomManagedSelector.class); + private final CountDownLatch failedLatch; + + public CustomManagedSelector(SelectorManager selectorManager, int id, CountDownLatch failedLatch, ExecutionStrategy.Factory executionFactory) + { + super(selectorManager, id, executionFactory); + this.failedLatch = failedLatch; + } + + @Override + protected void onSelectFailed(Throwable cause) + { + try + { + LOG.debug("onSelectFailed()", cause); + this.startSelector(); + } + catch (Exception ex) + { + LOG.warn(ex); + } + failedLatch.countDown(); + } + } + + public static class HelloServlet extends HttpServlet + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + resp.getWriter().printf("Hello %s:%d%n", req.getRemoteAddr(), req.getRemotePort()); + } + } + + private static class InterruptSelector implements Runnable + { + private static final Logger LOG = Log.getLogger(InterruptSelector.class); + private final ServerConnector connector; + + public InterruptSelector(ServerConnector connector) + { + this.connector = connector; + } + + @Override + public void run() + { + SelectorManager selectorManager = connector.getSelectorManager(); + Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); + for (ManagedSelector managedSelector : managedSelectors) + { + if (managedSelector instanceof CustomManagedSelector) + { + CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; + Selector selector = customManagedSelector.getSelector(); + LOG.debug("Closing selector {}}", selector); + IO.close(selector); + } + } + } + } + + public static class CloseSelectorServlet extends HttpServlet + { + private static final int DELAY_MS = 500; + private ServerConnector connector; + private ScheduledExecutorService scheduledExecutorService; + + public CloseSelectorServlet(ServerConnector connector) + { + this.connector = connector; + scheduledExecutorService = Executors.newScheduledThreadPool(5); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS); + scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS); + } + } + + public static class AsyncCloseSelectorServlet extends HttpServlet + { + private static final int DELAY_MS = 200; + private ServerConnector connector; + private ScheduledExecutorService scheduledExecutorService; + public CountDownLatch onErrorLatch = new CountDownLatch(1); + + public AsyncCloseSelectorServlet(ServerConnector connector) + { + this.connector = connector; + scheduledExecutorService = Executors.newScheduledThreadPool(5); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + ServletOutputStream out = resp.getOutputStream(); + out.print("Closing selectors " + DELAY_MS); + + AsyncContext asyncContext = req.startAsync(); + asyncContext.setTimeout(0); + asyncContext.addListener(new AsyncListener() + { + @Override + public void onComplete(AsyncEvent event) + { + } + + @Override + public void onTimeout(AsyncEvent event) + { + } + + @Override + public void onError(AsyncEvent event) + { + resp.setStatus(500); + event.getAsyncContext().complete(); + onErrorLatch.countDown(); + } + + @Override + public void onStartAsync(AsyncEvent event) + { + } + }); + + scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS); + /* trigger EofException after selector close + scheduledExecutorService.schedule(() -> + { + byte[] b = new byte[128 * 1024 * 1024]; + Arrays.fill(b, (byte)'x'); + try + { + out.write(b); + out.flush(); + } + catch (IOException e) + { + e.printStackTrace(System.out); + } + }, DELAY_MS * 2, TimeUnit.MILLISECONDS); + */ + } + } +}