diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index d048ce6b312..a39890d7557 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; @@ -122,12 +123,20 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable start._started.await(); } + protected void onSelectFailed(Throwable cause) + { + // override to change behavior + } + public int size() { Selector s = _selector; if (s == null) return 0; - return s.keys().size(); + Set keys = s.keys(); + if (keys == null) + return 0; + return keys.size(); } @Override @@ -135,7 +144,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { // doStop might be called for a failed managedSelector, // We do not want to wait twice, so we only stop once for each start - if (_started.compareAndSet(true, false)) + if (_started.compareAndSet(true, false) && _selector != null) { // Close connections, but only wait a single selector cycle for it to take effect CloseConnections closeConnections = new CloseConnections(); @@ -210,7 +219,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable catch (RejectedExecutionException x) { if (task instanceof Closeable) - closeNoExceptions((Closeable)task); + IO.close((Closeable)task); } } @@ -246,17 +255,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private static void closeNoExceptions(Closeable closeable) + protected void endPointOpened(EndPoint endPoint) { - try - { - if (closeable != null) - closeable.close(); - } - catch (Throwable x) - { - LOG.ignore(x); - } + _selectorManager.endPointOpened(endPoint); + } + + protected void endPointClosed(EndPoint endPoint) + { + _selectorManager.endPointClosed(endPoint); } private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException @@ -266,7 +272,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable endPoint.setConnection(connection); selectionKey.attach(endPoint); endPoint.onOpen(); - _selectorManager.endPointOpened(endPoint); + endPointOpened(endPoint); _selectorManager.connectionOpened(connection); if (LOG.isDebugEnabled()) LOG.debug("Created {}", endPoint); @@ -496,15 +502,19 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable x) { + IO.close(_selector); _selector = null; + if (isRunning()) - LOG.warn(x); + { + LOG.warn("Fatal select() failure", x); + onSelectFailed(x); + } else { LOG.warn(x.toString()); LOG.debug(x); } - closeNoExceptions(_selector); } return false; } @@ -541,13 +551,13 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { LOG.debug("Ignoring cancelled key for channel {}", key.channel()); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } catch (Throwable x) { LOG.warn("Could not process key for channel " + key.channel(), x); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } else @@ -556,7 +566,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); Object attachment = key.attachment(); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } return null; @@ -661,7 +671,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable x) { - closeNoExceptions(_channel); + IO.close(_channel); LOG.warn(x); } } @@ -683,7 +693,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable x) { - closeNoExceptions(channel); + IO.close(channel); LOG.warn("Accept failed for channel " + channel, x); } @@ -722,7 +732,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable public void close() { LOG.debug("closed accept of {}", channel); - closeNoExceptions(channel); + IO.close(channel); } @Override @@ -735,7 +745,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable x) { - closeNoExceptions(channel); + IO.close(channel); _selectorManager.onAcceptFailed(channel, x); LOG.debug(x); } @@ -758,7 +768,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable protected void failed(Throwable failure) { - closeNoExceptions(channel); + IO.close(channel); LOG.warn(String.valueOf(failure)); LOG.debug(failure); _selectorManager.onAcceptFailed(channel, failure); @@ -808,7 +818,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable if (failed.compareAndSet(false, true)) { timeout.cancel(); - closeNoExceptions(channel); + IO.close(channel); ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment); } } @@ -864,12 +874,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { if (_closed == null) { - closeNoExceptions(closeable); + IO.close(closeable); } else if (!_closed.contains(closeable)) { _closed.add(closeable); - closeNoExceptions(closeable); + IO.close(closeable); } } } @@ -894,12 +904,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { Object attachment = key.attachment(); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } _selector = null; - closeNoExceptions(selector); + IO.close(selector); _stopped.countDown(); } } @@ -924,7 +934,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable failure) { - closeNoExceptions(_connect.channel); + IO.close(_connect.channel); LOG.warn(String.valueOf(failure)); LOG.debug(failure); _connect.failed(failure); @@ -955,7 +965,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable Connection connection = endPoint.getConnection(); if (connection != null) _selectorManager.connectionClosed(connection); - _selectorManager.endPointClosed(endPoint); + ManagedSelector.this.endPointClosed(endPoint); } @Override diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java new file mode 100644 index 00000000000..5fc00669aff --- /dev/null +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java @@ -0,0 +1,395 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.test; + +import java.io.IOException; +import java.net.URI; +import java.nio.channels.Selector; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.Scheduler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FailedSelectorTest +{ + private static final Logger LOG = Log.getLogger(FailedSelectorTest.class); + private HttpClient client; + private Server server; + private StacklessLogging stacklessManagedSelector; + + @AfterEach + public void stopServerAndClient() throws Exception + { + server.stop(); + client.stop(); + stacklessManagedSelector.close(); + } + + @BeforeEach + public void startClient() throws Exception + { + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + QueuedThreadPool qtp = new QueuedThreadPool(); + qtp.setName("Client"); + qtp.setStopTimeout(1000); + client = new HttpClient(transport, null); + client.setExecutor(qtp); + + client.setIdleTimeout(1000); + client.setMaxConnectionsPerDestination(1); + client.setMaxRequestsQueuedPerDestination(1); + client.start(); + } + + public void startServer(Function customizeServerConsumer) throws Exception + { + stacklessManagedSelector = new StacklessLogging(ManagedSelector.class); + + server = new Server(); + server.setStopTimeout(1000); + server.setStopAtShutdown(true); + + ServerConnector connector = customizeServerConsumer.apply(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + context.addServlet(HelloServlet.class, "/hello"); + + ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector)); + context.addServlet(closeHolder, "/selector/close"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @Test + public void testRestartServerOnSelectFailure() throws Exception + { + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartServerTask(server, failedLatch)); + connector.setPort(0); + connector.setIdleTimeout(1000); + return connector; + }); + + // Request /hello + assertRequestHello(); + + // Request /selector/close + assertRequestSelectorClose(); + + // Wait for selectors to close from action above + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); + + // Request /hello + assertRequestHello(); + } + + @Test + public void testRestartSelectorOnSelectFailure() throws Exception + { + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartSelectorTask(failedLatch)); + connector.setPort(0); + connector.setIdleTimeout(1000); + return connector; + }); + + // Request /hello + assertRequestHello(); + + // Request /selector/close + assertRequestSelectorClose(); + + // Wait for selectors to close from action above + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); + + // Request /hello + assertRequestHello(); + } + + private void assertRequestSelectorClose() throws InterruptedException, ExecutionException, TimeoutException + { + URI dest = server.getURI().resolve("/selector/close"); + LOG.info("Requesting GET on {}", dest); + + ContentResponse response = client.newRequest(dest) + .method(HttpMethod.GET) + .header(HttpHeader.CONNECTION, "close") + .send(); + + assertThat(dest + " status", response.getStatus(), is(HttpStatus.OK_200)); + assertThat(dest + " response", response.getContentAsString(), startsWith("Closing selectors ")); + } + + private void assertRequestHello() throws InterruptedException, ExecutionException, TimeoutException + { + URI dest = server.getURI().resolve("/hello"); + LOG.info("Requesting GET on {}", dest); + ContentResponse response = client.newRequest(dest) + .method(HttpMethod.GET) + .header(HttpHeader.CONNECTION, "close") + .send(); + + assertThat(dest + " status", response.getStatus(), is(HttpStatus.OK_200)); + assertThat(dest + " response", response.getContentAsString(), startsWith("Hello ")); + } + + public static class HelloServlet extends HttpServlet + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + resp.getWriter().printf("Hello %s:%d%n", req.getRemoteAddr(), req.getRemotePort()); + } + } + + public static class CloseSelectorServlet extends HttpServlet + { + private static final int DELAY_MS = 500; + private ServerConnector connector; + private ScheduledExecutorService scheduledExecutorService; + + public CloseSelectorServlet(ServerConnector connector) + { + this.connector = connector; + scheduledExecutorService = Executors.newScheduledThreadPool(5); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + resp.setHeader("Connection", "close"); + resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS); + scheduledExecutorService.schedule(new ForceCloseSelectorTask(connector), DELAY_MS, TimeUnit.MILLISECONDS); + } + } + + public static class RestartSelectorCustomConnector extends ServerConnector + { + private final Consumer onSelectFailConsumer; + + public RestartSelectorCustomConnector(Server server, int acceptors, int selectors, Consumer onSelectFailConsumer) + { + super(server, acceptors, selectors); + this.onSelectFailConsumer = onSelectFailConsumer; + } + + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new CustomManagedSelector(this, id, onSelectFailConsumer); + } + }; + } + } + + public static class CustomManagedSelector extends ManagedSelector + { + private final Set endpoints = ConcurrentHashMap.newKeySet(); + private final Consumer onSelectFailConsumer; + + public CustomManagedSelector(SelectorManager selectorManager, int id, Consumer onSelectFailConsumer) + { + super(selectorManager, id); + this.onSelectFailConsumer = onSelectFailConsumer; + } + + @Override + protected void endPointOpened(EndPoint endPoint) + { + super.endPointOpened(endPoint); + endpoints.add(endPoint); + } + + @Override + protected void endPointClosed(EndPoint endPoint) + { + super.endPointClosed(endPoint); + endpoints.remove(endPoint); + } + + @Override + protected void onSelectFailed(Throwable cause) + { + endpoints.forEach((endpoint) -> + { + if (endpoint.getConnection() != null) + { + IO.close(endpoint.getConnection()); + } + IO.close(endpoint); + }); + endpoints.clear(); + + new Thread(() -> onSelectFailConsumer.accept(this), "OnSelectFailedTask").start(); + } + } + + private static class RestartSelectorTask implements Consumer + { + private static final Logger LOG = Log.getLogger(RestartSelectorTask.class); + private final CountDownLatch latch; + + public RestartSelectorTask(CountDownLatch latch) + { + this.latch = latch; + } + + @Override + public void accept(CustomManagedSelector customManagedSelector) + { + try + { + customManagedSelector.stop(); + customManagedSelector.start(); + } + catch (Exception e) + { + LOG.warn(e); + } + finally + { + latch.countDown(); + } + } + } + + private static class RestartServerTask implements Consumer + { + private static final Logger LOG = Log.getLogger(RestartServerTask.class); + private final Server server; + private final CountDownLatch latch; + + public RestartServerTask(Server server, CountDownLatch latch) + { + this.server = server; + this.latch = latch; + } + + @Override + public void accept(CustomManagedSelector customManagedSelector) + { + try + { + server.stop(); + server.start(); + } + catch (Exception e) + { + LOG.warn(e); + } + finally + { + latch.countDown(); + } + } + } + + private static class ForceCloseSelectorTask implements Runnable + { + private static final Logger LOG = Log.getLogger(ForceCloseSelectorTask.class); + private final ServerConnector connector; + + public ForceCloseSelectorTask(ServerConnector connector) + { + this.connector = connector; + } + + @Override + public void run() + { + SelectorManager selectorManager = connector.getSelectorManager(); + Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); + for (ManagedSelector managedSelector : managedSelectors) + { + if (managedSelector instanceof CustomManagedSelector) + { + CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; + Selector selector = customManagedSelector.getSelector(); + LOG.debug("Closing selector {}}", selector); + IO.close(selector); + } + } + } + } +} \ No newline at end of file diff --git a/tests/test-integration/src/test/resources/jetty-logging.properties b/tests/test-integration/src/test/resources/jetty-logging.properties index 1531468c2c1..fdc5a51caba 100644 --- a/tests/test-integration/src/test/resources/jetty-logging.properties +++ b/tests/test-integration/src/test/resources/jetty-logging.properties @@ -1,3 +1,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog +org.eclipse.jetty.LEVEL=WARN #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.websocket.LEVEL=DEBUG