From b7b744160f7de81e7a0d9f63851406daaa4c0da3 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Fri, 6 Sep 2019 16:10:30 -0500 Subject: [PATCH] Issue #3989 - Tests for both Restart Server and Selector Signed-off-by: Joakim Erdfelt --- .../org/eclipse/jetty/io/ManagedSelector.java | 14 +- .../jetty/test/FailedSelectorTest.java | 294 +++++++++++------- 2 files changed, 199 insertions(+), 109 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 5628ffc90b8..35e9da322af 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -267,6 +267,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } + protected void endPointOpened(EndPoint endPoint) + { + _selectorManager.endPointOpened(endPoint); + } + + protected void endPointClosed(EndPoint endPoint) + { + _selectorManager.endPointClosed(endPoint); + } + private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException { EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); @@ -274,7 +284,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable endPoint.setConnection(connection); selectionKey.attach(endPoint); endPoint.onOpen(); - _selectorManager.endPointOpened(endPoint); + endPointOpened(endPoint); _selectorManager.connectionOpened(connection); if (LOG.isDebugEnabled()) LOG.debug("Created {}", endPoint); @@ -967,7 +977,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable Connection connection = endPoint.getConnection(); if (connection != null) _selectorManager.connectionClosed(connection); - _selectorManager.endPointClosed(endPoint); + ManagedSelector.this.endPointClosed(endPoint); } @Override diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java index 371eb208896..259e2c72550 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.net.URI; import java.nio.channels.Selector; import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -29,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Function; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -41,6 +44,7 @@ import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.server.Server; @@ -72,9 +76,7 @@ public class FailedSelectorTest @AfterEach public void stopServerAndClient() throws Exception { - LOG.info("Deathing Server"); server.stop(); - LOG.info("Deathing Client"); client.stop(); } @@ -89,8 +91,8 @@ public class FailedSelectorTest client.setExecutor(qtp); client.setIdleTimeout(1000); -// client.setMaxConnectionsPerDestination(1); -// client.setMaxRequestsQueuedPerDestination(1); + client.setMaxConnectionsPerDestination(1); + client.setMaxRequestsQueuedPerDestination(1); client.start(); } @@ -126,7 +128,7 @@ public class FailedSelectorTest startServer((server) -> { - CustomServerConnector connector = new CustomServerConnector(server, 1, 1, new RestartServerTask(server, failedLatch)); + RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartServerTask(server, failedLatch)); connector.setPort(0); connector.setIdleTimeout(1000); return connector; @@ -140,12 +142,35 @@ public class FailedSelectorTest // Wait for selectors to close from action above assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); - LOG.info("Got failedLatch"); + + // Request /hello + assertRequestHello(); + } + + @Test + public void testRestartSelectorOnSelectFailure() throws Exception + { + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartSelectorTask(failedLatch)); + connector.setPort(0); + connector.setIdleTimeout(1000); + return connector; + }); // Request /hello assertRequestHello(); - LOG.info("Test done"); + // Request /selector/close + assertRequestSelectorClose("/selector/close"); + + // Wait for selectors to close from action above + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); + + // Request /hello + assertRequestHello(); } private void assertRequestSelectorClose(String path) throws InterruptedException, ExecutionException, TimeoutException @@ -175,77 +200,6 @@ public class FailedSelectorTest assertThat("/hello response", response.getContentAsString(), startsWith("Hello ")); } - public static class RestartServerTask implements Runnable - { - private final Server server; - private final CountDownLatch latch; - - public RestartServerTask(Server server, CountDownLatch latch) - { - this.server = server; - this.latch = latch; - } - - @Override - public void run() - { - try - { - server.stop(); - server.start(); - } - catch (Exception e) - { - LOG.warn(e); - } - finally - { - latch.countDown(); - } - } - } - - public static class CustomServerConnector extends ServerConnector - { - private final Runnable onSelectFailureTask; - - public CustomServerConnector(Server server, int acceptors, int selectors, Runnable onSelectFailureTask) - { - super(server, acceptors, selectors); - this.onSelectFailureTask = onSelectFailureTask; - } - - @Override - protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) - { - return new ServerConnectorManager(executor, scheduler, selectors) - { - @Override - protected ManagedSelector newSelector(int id) - { - return new CustomManagedSelector(this, id, onSelectFailureTask); - } - }; - } - } - - public static class CustomManagedSelector extends ManagedSelector - { - private final Runnable onSelectFailureTask; - - public CustomManagedSelector(SelectorManager selectorManager, int id, Runnable onSelectFailureTask) - { - super(selectorManager, id); - this.onSelectFailureTask = onSelectFailureTask; - } - - @Override - protected void onSelectFailed(Throwable cause) - { - new Thread(onSelectFailureTask, "onSelectFailedTask").start(); - } - } - public static class HelloServlet extends HttpServlet { @Override @@ -257,34 +211,6 @@ public class FailedSelectorTest } } - private static class InterruptSelector implements Runnable - { - private static final Logger LOG = Log.getLogger(InterruptSelector.class); - private final ServerConnector connector; - - public InterruptSelector(ServerConnector connector) - { - this.connector = connector; - } - - @Override - public void run() - { - SelectorManager selectorManager = connector.getSelectorManager(); - Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); - for (ManagedSelector managedSelector : managedSelectors) - { - if (managedSelector instanceof CustomManagedSelector) - { - CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; - Selector selector = customManagedSelector.getSelector(); - LOG.debug("Closing selector {}}", selector); - IO.close(selector); - } - } - } - } - public static class CloseSelectorServlet extends HttpServlet { private static final int DELAY_MS = 500; @@ -304,7 +230,161 @@ public class FailedSelectorTest resp.setCharacterEncoding("utf-8"); resp.setHeader("Connection", "close"); resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS); - scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS); + scheduledExecutorService.schedule(new InterruptSelectorTask(connector), DELAY_MS, TimeUnit.MILLISECONDS); + } + } + + public static class RestartSelectorCustomConnector extends ServerConnector + { + private final Consumer onSelectFailConsumer; + + public RestartSelectorCustomConnector(Server server, int acceptors, int selectors, Consumer onSelectFailConsumer) + { + super(server, acceptors, selectors); + this.onSelectFailConsumer = onSelectFailConsumer; + } + + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new CustomManagedSelector(this, id, onSelectFailConsumer); + } + }; + } + } + + public static class CustomManagedSelector extends ManagedSelector + { + private final Set endpoints = ConcurrentHashMap.newKeySet(); + private final Consumer onSelectFailConsumer; + + public CustomManagedSelector(SelectorManager selectorManager, int id, Consumer onSelectFailConsumer) + { + super(selectorManager, id); + this.onSelectFailConsumer = onSelectFailConsumer; + } + + @Override + protected void endPointOpened(EndPoint endPoint) + { + super.endPointOpened(endPoint); + endpoints.add(endPoint); + } + + @Override + protected void endPointClosed(EndPoint endPoint) + { + super.endPointClosed(endPoint); + endpoints.remove(endPoint); + } + + @Override + protected void onSelectFailed(Throwable cause) + { + endpoints.forEach((endpoint) -> + { + if (endpoint.getConnection() != null) + { + IO.close(endpoint.getConnection()); + } + IO.close(endpoint); + }); + endpoints.clear(); + + new Thread(() -> onSelectFailConsumer.accept(this), "OnSelectFailedTask").start(); + } + } + + private static class RestartSelectorTask implements Consumer + { + private static final Logger LOG = Log.getLogger(RestartSelectorTask.class); + private final CountDownLatch latch; + + public RestartSelectorTask(CountDownLatch latch) + { + this.latch = latch; + } + + @Override + public void accept(CustomManagedSelector customManagedSelector) + { + try + { + customManagedSelector.stop(); + customManagedSelector.start(); + } + catch (Exception e) + { + LOG.warn(e); + } + finally + { + latch.countDown(); + } + } + } + + private static class RestartServerTask implements Consumer + { + private static final Logger LOG = Log.getLogger(RestartServerTask.class); + private final Server server; + private final CountDownLatch latch; + + public RestartServerTask(Server server, CountDownLatch latch) + { + this.server = server; + this.latch = latch; + } + + @Override + public void accept(CustomManagedSelector customManagedSelector) + { + try + { + server.stop(); + server.start(); + } + catch (Exception e) + { + LOG.warn(e); + } + finally + { + latch.countDown(); + } + } + } + + private static class InterruptSelectorTask implements Runnable + { + private static final Logger LOG = Log.getLogger(InterruptSelectorTask.class); + private final ServerConnector connector; + + public InterruptSelectorTask(ServerConnector connector) + { + this.connector = connector; + } + + @Override + public void run() + { + SelectorManager selectorManager = connector.getSelectorManager(); + Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); + for (ManagedSelector managedSelector : managedSelectors) + { + if (managedSelector instanceof CustomManagedSelector) + { + CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; + Selector selector = customManagedSelector.getSelector(); + LOG.debug("Closing selector {}}", selector); + IO.close(selector); + } + } } } } \ No newline at end of file