From de9677c8af0061251b824ab44ae05ca278508631 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Mon, 19 Aug 2019 15:50:35 -0500 Subject: [PATCH 1/5] Fixes #3989 - Selector restart with custom ManagedSelector + Adds onSelectFailed(Throwable) + Adds startSelector() + Unit test to demonstrate behavior on both execution strategies Signed-off-by: Joakim Erdfelt --- .../org/eclipse/jetty/io/ManagedSelector.java | 27 +- .../jetty/test/RebuildSelectorTest.java | 254 ++++++++++++++++++ 2 files changed, 278 insertions(+), 3 deletions(-) create mode 100644 tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index b3780d26b33..8e3bb03b4b7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -87,6 +87,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump protected void doStart() throws Exception { super.doStart(); + startSelector(); + } + + protected void startSelector() throws IOException + { _selector = newSelector(); _selectorManager.execute(this); } @@ -96,12 +101,25 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump return Selector.open(); } + public Selector getSelector() + { + return _selector; + } + + protected void onSelectFailed(Throwable cause) + { + LOG.warn(toString(), cause); + } + public int size() { Selector s = _selector; if (s == null) return 0; - return s.keys().size(); + Set keys = s.keys(); + if (keys == null) + return 0; + return keys.size(); } @Override @@ -258,14 +276,17 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { + Selector selector = _selector; if (isRunning()) - LOG.warn(x); + { + onSelectFailed(x); + } else { LOG.warn(x.toString()); LOG.debug(x); } - closeNoExceptions(_selector); + closeNoExceptions(selector); } return false; } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java new file mode 100644 index 00000000000..42de51e68cf --- /dev/null +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java @@ -0,0 +1,254 @@ +package org.eclipse.jetty.test; + +import java.io.IOException; +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; +import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; + +@RunWith(Parameterized.class) +public class RebuildSelectorTest +{ + @Parameterized.Parameters(name = "{0}") + public static List params() + { + List strategies = new ArrayList<>(); + strategies.add(new ExecutionStrategy.Factory[]{new ExecuteProduceConsume.Factory()}); + strategies.add(new ExecutionStrategy.Factory[]{new ProduceExecuteConsume.Factory()}); + return strategies; + } + + private HttpClient client; + private Server server; + + @After + public void stopServer() throws Exception + { + server.stop(); + } + + @Before + public void startClient() throws Exception + { + client = new HttpClient(); + client.setIdleTimeout(2000); + client.setMaxConnectionsPerDestination(1); + client.start(); + } + + @After + public void stopClient() throws Exception + { + client.stop(); + } + + public RebuildSelectorTest(ExecutionStrategy.Factory strategyFactory) throws Exception + { + server = new Server(); + + CustomServerConnector connector = new CustomServerConnector(server, strategyFactory, 1, 1); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + context.addServlet(HelloServlet.class, "/hello"); + context.addServlet(CloseSelectorServlet.class, "/selector/close"); + context.setAttribute("connector", connector); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @Test + public void testRebuildServerSelector() throws Exception + { + // Request /hello + assertRequestHello(); + + // Request /selector/close + assertRequestSelectorClose(); + + // Wait for selectors to close from action above + TimeUnit.MILLISECONDS.sleep(1000); + + // Request /hello + assertRequestHello(); + } + + private void assertRequestSelectorClose() throws InterruptedException, ExecutionException, TimeoutException + { + ContentResponse response = client.newRequest(server.getURI().resolve("/selector/close")) + .method(HttpMethod.GET) + .header(HttpHeader.CONNECTION, "close") + .send(); + + assertThat("/selector/close status", response.getStatus(), is(HttpStatus.OK_200)); + assertThat("/selector/close response", response.getContentAsString(), startsWith("Closing selectors ")); + } + + private void assertRequestHello() throws InterruptedException, ExecutionException, TimeoutException + { + ContentResponse response = client.newRequest(server.getURI().resolve("/hello")) + .method(HttpMethod.GET) + .header(HttpHeader.CONNECTION, "close") + .send(); + + assertThat("/hello status", response.getStatus(), is(HttpStatus.OK_200)); + assertThat("/hello response", response.getContentAsString(), startsWith("Hello ")); + } + + public static class CustomServerConnector extends ServerConnector + { + private final ExecutionStrategy.Factory strategyFactory; + + public CustomServerConnector(Server server, ExecutionStrategy.Factory strategyFactory, int acceptors, int selectors) + { + super(server, acceptors, selectors); + this.strategyFactory = strategyFactory; + } + + @Override + public ExecutionStrategy.Factory getExecutionStrategyFactory() + { + return this.strategyFactory; + } + + @Override + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new ServerConnectorManager(executor, scheduler, selectors) + { + @Override + protected ManagedSelector newSelector(int id) + { + return new CustomManagedSelector(this, id, getExecutionStrategyFactory()); + } + }; + } + } + + public static class CustomManagedSelector extends ManagedSelector + { + private static final Logger LOG = Log.getLogger(CustomManagedSelector.class); + + public CustomManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory) + { + super(selectorManager, id, executionFactory); + } + + @Override + protected void onSelectFailed(Throwable cause) + { + try + { + LOG.debug("onSelectFailed()", cause); + this.startSelector(); + } + catch (Exception ex) + { + LOG.warn(ex); + } + } + } + + public static class HelloServlet extends HttpServlet + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + resp.getWriter().printf("Hello %s:%d%n", req.getRemoteAddr(), req.getRemotePort()); + } + } + + public static class CloseSelectorServlet extends HttpServlet + { + private static final Logger LOG = Log.getLogger(CloseSelectorServlet.class); + private static final int DELAY_MS = 500; + private ServerConnector connector; + private ScheduledExecutorService scheduledExecutorService; + private InterruptSelector interruptSelectorRunnable; + + @Override + public void init() + { + connector = (ServerConnector)getServletContext().getAttribute("connector"); + scheduledExecutorService = Executors.newScheduledThreadPool(5); + interruptSelectorRunnable = new InterruptSelector(); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS); + scheduledExecutorService.schedule(interruptSelectorRunnable, DELAY_MS, TimeUnit.MILLISECONDS); + } + + private class InterruptSelector implements Runnable + { + @Override + public void run() + { + SelectorManager selectorManager = connector.getSelectorManager(); + Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); + for (ManagedSelector managedSelector : managedSelectors) + { + if (managedSelector instanceof CustomManagedSelector) + { + CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; + Selector selector = customManagedSelector.getSelector(); + LOG.debug("Closing selector {}}", selector); + IO.close(selector); + } + } + } + } + } +} From d4c9b017e9bf9988a0c9a98fd2a4d5a0a4604f72 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 20 Aug 2019 09:36:35 -0500 Subject: [PATCH 2/5] Fixes #3989 - Selector restart with custom ManagedSelector + applying changed from PR review Signed-off-by: Joakim Erdfelt --- .../org/eclipse/jetty/io/ManagedSelector.java | 19 +++++- .../jetty/test/RebuildSelectorTest.java | 61 +++++++++++++------ 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 8e3bb03b4b7..011170884a6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -106,9 +106,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump return _selector; } - protected void onSelectFailed(Throwable cause) + protected void onSelectFailed(Throwable cause) throws IOException { - LOG.warn(toString(), cause); + LOG.info("Restarting selector: " + toString(), cause); + startSelector(); } public int size() @@ -279,7 +280,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump Selector selector = _selector; if (isRunning()) { - onSelectFailed(x); + notifySelectFailed(x); } else { @@ -363,6 +364,18 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } + private void notifySelectFailed(Throwable cause) + { + try + { + onSelectFailed(cause); + } + catch (IOException e) + { + LOG.info("Failure while calling onSelectFailed()", e); + } + } + private interface Product extends Runnable { } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java index 42de51e68cf..26d3f57fcdd 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java @@ -2,15 +2,15 @@ package org.eclipse.jetty.test; import java.io.IOException; import java.nio.channels.Selector; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -27,12 +27,14 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; +import org.eclipse.jetty.util.thread.strategy.ProduceConsume; import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; import org.junit.After; import org.junit.Before; @@ -43,21 +45,26 @@ import org.junit.runners.Parameterized; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) public class RebuildSelectorTest { @Parameterized.Parameters(name = "{0}") - public static List params() + public static ExecutionStrategy.Factory[] params() { - List strategies = new ArrayList<>(); - strategies.add(new ExecutionStrategy.Factory[]{new ExecuteProduceConsume.Factory()}); - strategies.add(new ExecutionStrategy.Factory[]{new ProduceExecuteConsume.Factory()}); - return strategies; + return new ExecutionStrategy.Factory[] + { + new ExecuteProduceConsume.Factory(), + new ProduceExecuteConsume.Factory(), + new ProduceConsume.Factory() + }; } private HttpClient client; private Server server; + @Parameterized.Parameter(0) + public ExecutionStrategy.Factory executionStrategy; @After public void stopServer() throws Exception @@ -80,19 +87,22 @@ public class RebuildSelectorTest client.stop(); } - public RebuildSelectorTest(ExecutionStrategy.Factory strategyFactory) throws Exception + public void startServer(Function customizeServerConsumer) throws Exception { server = new Server(); - CustomServerConnector connector = new CustomServerConnector(server, strategyFactory, 1, 1); - connector.setPort(0); + ServerConnector connector = customizeServerConsumer.apply(server); server.addConnector(connector); ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); context.addServlet(HelloServlet.class, "/hello"); - context.addServlet(CloseSelectorServlet.class, "/selector/close"); - context.setAttribute("connector", connector); + + ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector)); + context.addServlet(closeHolder, "/selector/close"); + +// ServletHolder asyncCloseHolder = new ServletHolder(new AsyncCloseSelectorServlet(connector)); +// context.addServlet(asyncCloseHolder, "/selector/async-close"); HandlerList handlers = new HandlerList(); handlers.addHandler(context); @@ -106,6 +116,15 @@ public class RebuildSelectorTest @Test public void testRebuildServerSelector() throws Exception { + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + CustomServerConnector connector = new CustomServerConnector(server, executionStrategy, failedLatch, 1, 1); + connector.setPort(0); + return connector; + }); + // Request /hello assertRequestHello(); @@ -113,7 +132,7 @@ public class RebuildSelectorTest assertRequestSelectorClose(); // Wait for selectors to close from action above - TimeUnit.MILLISECONDS.sleep(1000); + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); // Request /hello assertRequestHello(); @@ -144,11 +163,13 @@ public class RebuildSelectorTest public static class CustomServerConnector extends ServerConnector { private final ExecutionStrategy.Factory strategyFactory; + private final CountDownLatch failedLatch; - public CustomServerConnector(Server server, ExecutionStrategy.Factory strategyFactory, int acceptors, int selectors) + public CustomServerConnector(Server server, ExecutionStrategy.Factory strategyFactory, CountDownLatch failedLatch, int acceptors, int selectors) { super(server, acceptors, selectors); this.strategyFactory = strategyFactory; + this.failedLatch = failedLatch; } @Override @@ -165,7 +186,7 @@ public class RebuildSelectorTest @Override protected ManagedSelector newSelector(int id) { - return new CustomManagedSelector(this, id, getExecutionStrategyFactory()); + return new CustomManagedSelector(this, id, failedLatch, getExecutionStrategyFactory()); } }; } @@ -174,10 +195,12 @@ public class RebuildSelectorTest public static class CustomManagedSelector extends ManagedSelector { private static final Logger LOG = Log.getLogger(CustomManagedSelector.class); + private final CountDownLatch failedLatch; - public CustomManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory) + public CustomManagedSelector(SelectorManager selectorManager, int id, CountDownLatch failedLatch, ExecutionStrategy.Factory executionFactory) { super(selectorManager, id, executionFactory); + this.failedLatch = failedLatch; } @Override @@ -192,6 +215,7 @@ public class RebuildSelectorTest { LOG.warn(ex); } + failedLatch.countDown(); } } @@ -214,10 +238,9 @@ public class RebuildSelectorTest private ScheduledExecutorService scheduledExecutorService; private InterruptSelector interruptSelectorRunnable; - @Override - public void init() + public CloseSelectorServlet(ServerConnector connector) { - connector = (ServerConnector)getServletContext().getAttribute("connector"); + this.connector = connector; scheduledExecutorService = Executors.newScheduledThreadPool(5); interruptSelectorRunnable = new InterruptSelector(); } From e2ee75a40efbeb695357072ee9ee938414cf5b88 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 20 Aug 2019 09:59:53 -0500 Subject: [PATCH 3/5] Fixes #3989 - Adding AsyncClose onError test Requested from PR review by @gregw Signed-off-by: Joakim Erdfelt --- .../jetty/test/RebuildSelectorTest.java | 173 +++++++++++++++--- 1 file changed, 144 insertions(+), 29 deletions(-) diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java index 26d3f57fcdd..f769ec9137a 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java @@ -11,6 +11,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -38,6 +42,7 @@ import org.eclipse.jetty.util.thread.strategy.ProduceConsume; import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -51,19 +56,24 @@ import static org.junit.Assert.assertTrue; public class RebuildSelectorTest { @Parameterized.Parameters(name = "{0}") - public static ExecutionStrategy.Factory[] params() + public static Object[][] params() { - return new ExecutionStrategy.Factory[] + return new Object[][] { - new ExecuteProduceConsume.Factory(), - new ProduceExecuteConsume.Factory(), - new ProduceConsume.Factory() + {"EPC", new ExecuteProduceConsume.Factory()}, + {"PEC", new ProduceExecuteConsume.Factory()}, + {"PC", new ProduceConsume.Factory()} }; } private HttpClient client; private Server server; + private AsyncCloseSelectorServlet asyncCloseSelectorServlet; + @Parameterized.Parameter(0) + public String testMode; + + @Parameterized.Parameter(1) public ExecutionStrategy.Factory executionStrategy; @After @@ -101,8 +111,10 @@ public class RebuildSelectorTest ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector)); context.addServlet(closeHolder, "/selector/close"); -// ServletHolder asyncCloseHolder = new ServletHolder(new AsyncCloseSelectorServlet(connector)); -// context.addServlet(asyncCloseHolder, "/selector/async-close"); + asyncCloseSelectorServlet = new AsyncCloseSelectorServlet(connector); + ServletHolder asyncCloseHolder = new ServletHolder(asyncCloseSelectorServlet); + asyncCloseHolder.setAsyncSupported(true); + context.addServlet(asyncCloseHolder, "/selector/async-close"); HandlerList handlers = new HandlerList(); handlers.addHandler(context); @@ -114,7 +126,7 @@ public class RebuildSelectorTest } @Test - public void testRebuildServerSelector() throws Exception + public void testRebuildServerSelectorNormal() throws Exception { CountDownLatch failedLatch = new CountDownLatch(1); @@ -129,7 +141,7 @@ public class RebuildSelectorTest assertRequestHello(); // Request /selector/close - assertRequestSelectorClose(); + assertRequestSelectorClose("/selector/close"); // Wait for selectors to close from action above assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); @@ -138,9 +150,38 @@ public class RebuildSelectorTest assertRequestHello(); } - private void assertRequestSelectorClose() throws InterruptedException, ExecutionException, TimeoutException + @Test + @Ignore + public void testRebuildServerSelectorAsync() throws Exception { - ContentResponse response = client.newRequest(server.getURI().resolve("/selector/close")) + CountDownLatch failedLatch = new CountDownLatch(1); + + startServer((server) -> + { + CustomServerConnector connector = new CustomServerConnector(server, executionStrategy, failedLatch, 1, 1); + connector.setPort(0); + return connector; + }); + + // Request /hello + assertRequestHello(); + + // Request /selector/async-close + assertRequestSelectorClose("/selector/async-close"); + + // Wait for selectors to close from action above + assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); + + // Ensure that Async Listener onError was called + assertTrue(asyncCloseSelectorServlet.onErrorLatch.await(2, TimeUnit.SECONDS)); + + // Request /hello + assertRequestHello(); + } + + private void assertRequestSelectorClose(String path) throws InterruptedException, ExecutionException, TimeoutException + { + ContentResponse response = client.newRequest(server.getURI().resolve(path)) .method(HttpMethod.GET) .header(HttpHeader.CONNECTION, "close") .send(); @@ -230,19 +271,44 @@ public class RebuildSelectorTest } } + private static class InterruptSelector implements Runnable + { + private static final Logger LOG = Log.getLogger(InterruptSelector.class); + private final ServerConnector connector; + + public InterruptSelector(ServerConnector connector) + { + this.connector = connector; + } + + @Override + public void run() + { + SelectorManager selectorManager = connector.getSelectorManager(); + Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); + for (ManagedSelector managedSelector : managedSelectors) + { + if (managedSelector instanceof CustomManagedSelector) + { + CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; + Selector selector = customManagedSelector.getSelector(); + LOG.debug("Closing selector {}}", selector); + IO.close(selector); + } + } + } + } + public static class CloseSelectorServlet extends HttpServlet { - private static final Logger LOG = Log.getLogger(CloseSelectorServlet.class); private static final int DELAY_MS = 500; private ServerConnector connector; private ScheduledExecutorService scheduledExecutorService; - private InterruptSelector interruptSelectorRunnable; public CloseSelectorServlet(ServerConnector connector) { this.connector = connector; scheduledExecutorService = Executors.newScheduledThreadPool(5); - interruptSelectorRunnable = new InterruptSelector(); } @Override @@ -251,27 +317,76 @@ public class RebuildSelectorTest resp.setContentType("text/plain"); resp.setCharacterEncoding("utf-8"); resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS); - scheduledExecutorService.schedule(interruptSelectorRunnable, DELAY_MS, TimeUnit.MILLISECONDS); + scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS); + } + } + + public static class AsyncCloseSelectorServlet extends HttpServlet + { + private static final int DELAY_MS = 200; + private ServerConnector connector; + private ScheduledExecutorService scheduledExecutorService; + public CountDownLatch onErrorLatch = new CountDownLatch(1); + + public AsyncCloseSelectorServlet(ServerConnector connector) + { + this.connector = connector; + scheduledExecutorService = Executors.newScheduledThreadPool(5); } - private class InterruptSelector implements Runnable + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { - @Override - public void run() + resp.setContentType("text/plain"); + resp.setCharacterEncoding("utf-8"); + ServletOutputStream out = resp.getOutputStream(); + out.print("Closing selectors " + DELAY_MS); + + AsyncContext asyncContext = req.startAsync(); + asyncContext.setTimeout(0); + asyncContext.addListener(new AsyncListener() { - SelectorManager selectorManager = connector.getSelectorManager(); - Collection managedSelectors = selectorManager.getBeans(ManagedSelector.class); - for (ManagedSelector managedSelector : managedSelectors) + @Override + public void onComplete(AsyncEvent event) { - if (managedSelector instanceof CustomManagedSelector) - { - CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector; - Selector selector = customManagedSelector.getSelector(); - LOG.debug("Closing selector {}}", selector); - IO.close(selector); - } } - } + + @Override + public void onTimeout(AsyncEvent event) + { + } + + @Override + public void onError(AsyncEvent event) + { + resp.setStatus(500); + event.getAsyncContext().complete(); + onErrorLatch.countDown(); + } + + @Override + public void onStartAsync(AsyncEvent event) + { + } + }); + + scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS); + /* trigger EofException after selector close + scheduledExecutorService.schedule(() -> + { + byte[] b = new byte[128 * 1024 * 1024]; + Arrays.fill(b, (byte)'x'); + try + { + out.write(b); + out.flush(); + } + catch (IOException e) + { + e.printStackTrace(System.out); + } + }, DELAY_MS * 2, TimeUnit.MILLISECONDS); + */ } } } From 1ea4320a65761091cfa88a253e9bd1549ef2a981 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 20 Aug 2019 10:00:49 -0500 Subject: [PATCH 4/5] Adding missing license header Signed-off-by: Joakim Erdfelt --- .../jetty/test/RebuildSelectorTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java index f769ec9137a..365b90d1d1b 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/RebuildSelectorTest.java @@ -1,3 +1,21 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + package org.eclipse.jetty.test; import java.io.IOException; From 669253bf6dd3046e5461f93c2a071a21097ceae3 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 20 Aug 2019 10:03:29 -0500 Subject: [PATCH 5/5] Fixes #3989 - Using IO.close(Closable) instead Signed-off-by: Joakim Erdfelt --- .../org/eclipse/jetty/io/ManagedSelector.java | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 011170884a6..78a8171aa4a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; @@ -287,7 +288,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump LOG.warn(x.toString()); LOG.debug(x); } - closeNoExceptions(selector); + IO.close(selector); } return false; } @@ -328,13 +329,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump { LOG.debug("Ignoring cancelled key for channel {}", key.channel()); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } catch (Throwable x) { LOG.warn("Could not process key for channel " + key.channel(), x); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } else @@ -343,7 +344,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); Object attachment = key.attachment(); if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); + IO.close((EndPoint)attachment); } } return null; @@ -434,24 +435,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { - closeNoExceptions(channel); + IO.close(channel); LOG.warn("Accept failed for channel " + channel, x); } } - private void closeNoExceptions(Closeable closeable) - { - try - { - if (closeable != null) - closeable.close(); - } - catch (Throwable x) - { - LOG.ignore(x); - } - } - private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException { EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); @@ -572,7 +560,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { - closeNoExceptions(_channel); + IO.close(_channel); LOG.warn(x); } } @@ -593,7 +581,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump public void close() { LOG.debug("closed accept of {}", channel); - closeNoExceptions(channel); + IO.close(channel); } @Override @@ -606,7 +594,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } catch (Throwable x) { - closeNoExceptions(channel); + IO.close(channel); LOG.debug(x); } } @@ -641,12 +629,12 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump public void close() { LOG.debug("closed creation of {}", channel); - closeNoExceptions(channel); + IO.close(channel); } protected void failed(Throwable failure) { - closeNoExceptions(channel); + IO.close(channel); LOG.debug(failure); } } @@ -683,7 +671,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump if (failed.compareAndSet(false, true)) { timeout.cancel(); - closeNoExceptions(channel); + IO.close(channel); ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment); } } @@ -772,7 +760,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump @Override public void run() { - closeNoExceptions(_endPoint.getConnection()); + IO.close(_endPoint.getConnection()); _latch.countDown(); } } @@ -786,7 +774,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump { Selector selector = _selector; _selector = null; - closeNoExceptions(selector); + IO.close(selector); _latch.countDown(); }