Fixes #3989 - Selector restart with custom ManagedSelector

+ applying changed from PR review

Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Joakim Erdfelt 2019-08-20 09:36:35 -05:00
parent de9677c8af
commit d4c9b017e9
2 changed files with 58 additions and 22 deletions

View File

@ -106,9 +106,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
return _selector;
}
protected void onSelectFailed(Throwable cause)
protected void onSelectFailed(Throwable cause) throws IOException
{
LOG.warn(toString(), cause);
LOG.info("Restarting selector: " + toString(), cause);
startSelector();
}
public int size()
@ -279,7 +280,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
Selector selector = _selector;
if (isRunning())
{
onSelectFailed(x);
notifySelectFailed(x);
}
else
{
@ -363,6 +364,18 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private void notifySelectFailed(Throwable cause)
{
try
{
onSelectFailed(cause);
}
catch (IOException e)
{
LOG.info("Failure while calling onSelectFailed()", e);
}
}
private interface Product extends Runnable
{
}

View File

@ -2,15 +2,15 @@ package org.eclipse.jetty.test;
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -27,12 +27,14 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
import org.junit.After;
import org.junit.Before;
@ -43,21 +45,26 @@ import org.junit.runners.Parameterized;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class RebuildSelectorTest
{
@Parameterized.Parameters(name = "{0}")
public static List<ExecutionStrategy.Factory[]> params()
public static ExecutionStrategy.Factory[] params()
{
List<ExecutionStrategy.Factory[]> strategies = new ArrayList<>();
strategies.add(new ExecutionStrategy.Factory[]{new ExecuteProduceConsume.Factory()});
strategies.add(new ExecutionStrategy.Factory[]{new ProduceExecuteConsume.Factory()});
return strategies;
return new ExecutionStrategy.Factory[]
{
new ExecuteProduceConsume.Factory(),
new ProduceExecuteConsume.Factory(),
new ProduceConsume.Factory()
};
}
private HttpClient client;
private Server server;
@Parameterized.Parameter(0)
public ExecutionStrategy.Factory executionStrategy;
@After
public void stopServer() throws Exception
@ -80,19 +87,22 @@ public class RebuildSelectorTest
client.stop();
}
public RebuildSelectorTest(ExecutionStrategy.Factory strategyFactory) throws Exception
public void startServer(Function<Server, ServerConnector> customizeServerConsumer) throws Exception
{
server = new Server();
CustomServerConnector connector = new CustomServerConnector(server, strategyFactory, 1, 1);
connector.setPort(0);
ServerConnector connector = customizeServerConsumer.apply(server);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
context.addServlet(HelloServlet.class, "/hello");
context.addServlet(CloseSelectorServlet.class, "/selector/close");
context.setAttribute("connector", connector);
ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector));
context.addServlet(closeHolder, "/selector/close");
// ServletHolder asyncCloseHolder = new ServletHolder(new AsyncCloseSelectorServlet(connector));
// context.addServlet(asyncCloseHolder, "/selector/async-close");
HandlerList handlers = new HandlerList();
handlers.addHandler(context);
@ -106,6 +116,15 @@ public class RebuildSelectorTest
@Test
public void testRebuildServerSelector() throws Exception
{
CountDownLatch failedLatch = new CountDownLatch(1);
startServer((server) ->
{
CustomServerConnector connector = new CustomServerConnector(server, executionStrategy, failedLatch, 1, 1);
connector.setPort(0);
return connector;
});
// Request /hello
assertRequestHello();
@ -113,7 +132,7 @@ public class RebuildSelectorTest
assertRequestSelectorClose();
// Wait for selectors to close from action above
TimeUnit.MILLISECONDS.sleep(1000);
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
// Request /hello
assertRequestHello();
@ -144,11 +163,13 @@ public class RebuildSelectorTest
public static class CustomServerConnector extends ServerConnector
{
private final ExecutionStrategy.Factory strategyFactory;
private final CountDownLatch failedLatch;
public CustomServerConnector(Server server, ExecutionStrategy.Factory strategyFactory, int acceptors, int selectors)
public CustomServerConnector(Server server, ExecutionStrategy.Factory strategyFactory, CountDownLatch failedLatch, int acceptors, int selectors)
{
super(server, acceptors, selectors);
this.strategyFactory = strategyFactory;
this.failedLatch = failedLatch;
}
@Override
@ -165,7 +186,7 @@ public class RebuildSelectorTest
@Override
protected ManagedSelector newSelector(int id)
{
return new CustomManagedSelector(this, id, getExecutionStrategyFactory());
return new CustomManagedSelector(this, id, failedLatch, getExecutionStrategyFactory());
}
};
}
@ -174,10 +195,12 @@ public class RebuildSelectorTest
public static class CustomManagedSelector extends ManagedSelector
{
private static final Logger LOG = Log.getLogger(CustomManagedSelector.class);
private final CountDownLatch failedLatch;
public CustomManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory)
public CustomManagedSelector(SelectorManager selectorManager, int id, CountDownLatch failedLatch, ExecutionStrategy.Factory executionFactory)
{
super(selectorManager, id, executionFactory);
this.failedLatch = failedLatch;
}
@Override
@ -192,6 +215,7 @@ public class RebuildSelectorTest
{
LOG.warn(ex);
}
failedLatch.countDown();
}
}
@ -214,10 +238,9 @@ public class RebuildSelectorTest
private ScheduledExecutorService scheduledExecutorService;
private InterruptSelector interruptSelectorRunnable;
@Override
public void init()
public CloseSelectorServlet(ServerConnector connector)
{
connector = (ServerConnector)getServletContext().getAttribute("connector");
this.connector = connector;
scheduledExecutorService = Executors.newScheduledThreadPool(5);
interruptSelectorRunnable = new InterruptSelector();
}