Issue #3989 - Tests for both Restart Server and Selector

Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Joakim Erdfelt 2019-09-06 16:10:30 -05:00
parent f8041b23bd
commit b7b744160f
2 changed files with 199 additions and 109 deletions

View File

@ -267,6 +267,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
protected void endPointOpened(EndPoint endPoint)
{
_selectorManager.endPointOpened(endPoint);
}
protected void endPointClosed(EndPoint endPoint)
{
_selectorManager.endPointClosed(endPoint);
}
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
{ {
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
@ -274,7 +284,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
endPoint.setConnection(connection); endPoint.setConnection(connection);
selectionKey.attach(endPoint); selectionKey.attach(endPoint);
endPoint.onOpen(); endPoint.onOpen();
_selectorManager.endPointOpened(endPoint); endPointOpened(endPoint);
_selectorManager.connectionOpened(connection); _selectorManager.connectionOpened(connection);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Created {}", endPoint); LOG.debug("Created {}", endPoint);
@ -967,7 +977,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
Connection connection = endPoint.getConnection(); Connection connection = endPoint.getConnection();
if (connection != null) if (connection != null)
_selectorManager.connectionClosed(connection); _selectorManager.connectionClosed(connection);
_selectorManager.endPointClosed(endPoint); ManagedSelector.this.endPointClosed(endPoint);
} }
@Override @Override

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -29,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -41,6 +44,7 @@ import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -72,9 +76,7 @@ public class FailedSelectorTest
@AfterEach @AfterEach
public void stopServerAndClient() throws Exception public void stopServerAndClient() throws Exception
{ {
LOG.info("Deathing Server");
server.stop(); server.stop();
LOG.info("Deathing Client");
client.stop(); client.stop();
} }
@ -89,8 +91,8 @@ public class FailedSelectorTest
client.setExecutor(qtp); client.setExecutor(qtp);
client.setIdleTimeout(1000); client.setIdleTimeout(1000);
// client.setMaxConnectionsPerDestination(1); client.setMaxConnectionsPerDestination(1);
// client.setMaxRequestsQueuedPerDestination(1); client.setMaxRequestsQueuedPerDestination(1);
client.start(); client.start();
} }
@ -126,7 +128,7 @@ public class FailedSelectorTest
startServer((server) -> startServer((server) ->
{ {
CustomServerConnector connector = new CustomServerConnector(server, 1, 1, new RestartServerTask(server, failedLatch)); RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartServerTask(server, failedLatch));
connector.setPort(0); connector.setPort(0);
connector.setIdleTimeout(1000); connector.setIdleTimeout(1000);
return connector; return connector;
@ -140,12 +142,35 @@ public class FailedSelectorTest
// Wait for selectors to close from action above // Wait for selectors to close from action above
assertTrue(failedLatch.await(2, TimeUnit.SECONDS)); assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
LOG.info("Got failedLatch");
// Request /hello
assertRequestHello();
}
@Test
public void testRestartSelectorOnSelectFailure() throws Exception
{
CountDownLatch failedLatch = new CountDownLatch(1);
startServer((server) ->
{
RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartSelectorTask(failedLatch));
connector.setPort(0);
connector.setIdleTimeout(1000);
return connector;
});
// Request /hello // Request /hello
assertRequestHello(); assertRequestHello();
LOG.info("Test done"); // Request /selector/close
assertRequestSelectorClose("/selector/close");
// Wait for selectors to close from action above
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
// Request /hello
assertRequestHello();
} }
private void assertRequestSelectorClose(String path) throws InterruptedException, ExecutionException, TimeoutException private void assertRequestSelectorClose(String path) throws InterruptedException, ExecutionException, TimeoutException
@ -175,77 +200,6 @@ public class FailedSelectorTest
assertThat("/hello response", response.getContentAsString(), startsWith("Hello ")); assertThat("/hello response", response.getContentAsString(), startsWith("Hello "));
} }
public static class RestartServerTask implements Runnable
{
private final Server server;
private final CountDownLatch latch;
public RestartServerTask(Server server, CountDownLatch latch)
{
this.server = server;
this.latch = latch;
}
@Override
public void run()
{
try
{
server.stop();
server.start();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
latch.countDown();
}
}
}
public static class CustomServerConnector extends ServerConnector
{
private final Runnable onSelectFailureTask;
public CustomServerConnector(Server server, int acceptors, int selectors, Runnable onSelectFailureTask)
{
super(server, acceptors, selectors);
this.onSelectFailureTask = onSelectFailureTask;
}
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new CustomManagedSelector(this, id, onSelectFailureTask);
}
};
}
}
public static class CustomManagedSelector extends ManagedSelector
{
private final Runnable onSelectFailureTask;
public CustomManagedSelector(SelectorManager selectorManager, int id, Runnable onSelectFailureTask)
{
super(selectorManager, id);
this.onSelectFailureTask = onSelectFailureTask;
}
@Override
protected void onSelectFailed(Throwable cause)
{
new Thread(onSelectFailureTask, "onSelectFailedTask").start();
}
}
public static class HelloServlet extends HttpServlet public static class HelloServlet extends HttpServlet
{ {
@Override @Override
@ -257,34 +211,6 @@ public class FailedSelectorTest
} }
} }
private static class InterruptSelector implements Runnable
{
private static final Logger LOG = Log.getLogger(InterruptSelector.class);
private final ServerConnector connector;
public InterruptSelector(ServerConnector connector)
{
this.connector = connector;
}
@Override
public void run()
{
SelectorManager selectorManager = connector.getSelectorManager();
Collection<ManagedSelector> managedSelectors = selectorManager.getBeans(ManagedSelector.class);
for (ManagedSelector managedSelector : managedSelectors)
{
if (managedSelector instanceof CustomManagedSelector)
{
CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector;
Selector selector = customManagedSelector.getSelector();
LOG.debug("Closing selector {}}", selector);
IO.close(selector);
}
}
}
}
public static class CloseSelectorServlet extends HttpServlet public static class CloseSelectorServlet extends HttpServlet
{ {
private static final int DELAY_MS = 500; private static final int DELAY_MS = 500;
@ -304,7 +230,161 @@ public class FailedSelectorTest
resp.setCharacterEncoding("utf-8"); resp.setCharacterEncoding("utf-8");
resp.setHeader("Connection", "close"); resp.setHeader("Connection", "close");
resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS); resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS);
scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS); scheduledExecutorService.schedule(new InterruptSelectorTask(connector), DELAY_MS, TimeUnit.MILLISECONDS);
}
}
public static class RestartSelectorCustomConnector extends ServerConnector
{
private final Consumer<CustomManagedSelector> onSelectFailConsumer;
public RestartSelectorCustomConnector(Server server, int acceptors, int selectors, Consumer<CustomManagedSelector> onSelectFailConsumer)
{
super(server, acceptors, selectors);
this.onSelectFailConsumer = onSelectFailConsumer;
}
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new CustomManagedSelector(this, id, onSelectFailConsumer);
}
};
}
}
public static class CustomManagedSelector extends ManagedSelector
{
private final Set<EndPoint> endpoints = ConcurrentHashMap.newKeySet();
private final Consumer<CustomManagedSelector> onSelectFailConsumer;
public CustomManagedSelector(SelectorManager selectorManager, int id, Consumer<CustomManagedSelector> onSelectFailConsumer)
{
super(selectorManager, id);
this.onSelectFailConsumer = onSelectFailConsumer;
}
@Override
protected void endPointOpened(EndPoint endPoint)
{
super.endPointOpened(endPoint);
endpoints.add(endPoint);
}
@Override
protected void endPointClosed(EndPoint endPoint)
{
super.endPointClosed(endPoint);
endpoints.remove(endPoint);
}
@Override
protected void onSelectFailed(Throwable cause)
{
endpoints.forEach((endpoint) ->
{
if (endpoint.getConnection() != null)
{
IO.close(endpoint.getConnection());
}
IO.close(endpoint);
});
endpoints.clear();
new Thread(() -> onSelectFailConsumer.accept(this), "OnSelectFailedTask").start();
}
}
private static class RestartSelectorTask implements Consumer<CustomManagedSelector>
{
private static final Logger LOG = Log.getLogger(RestartSelectorTask.class);
private final CountDownLatch latch;
public RestartSelectorTask(CountDownLatch latch)
{
this.latch = latch;
}
@Override
public void accept(CustomManagedSelector customManagedSelector)
{
try
{
customManagedSelector.stop();
customManagedSelector.start();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
latch.countDown();
}
}
}
private static class RestartServerTask implements Consumer<CustomManagedSelector>
{
private static final Logger LOG = Log.getLogger(RestartServerTask.class);
private final Server server;
private final CountDownLatch latch;
public RestartServerTask(Server server, CountDownLatch latch)
{
this.server = server;
this.latch = latch;
}
@Override
public void accept(CustomManagedSelector customManagedSelector)
{
try
{
server.stop();
server.start();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
latch.countDown();
}
}
}
private static class InterruptSelectorTask implements Runnable
{
private static final Logger LOG = Log.getLogger(InterruptSelectorTask.class);
private final ServerConnector connector;
public InterruptSelectorTask(ServerConnector connector)
{
this.connector = connector;
}
@Override
public void run()
{
SelectorManager selectorManager = connector.getSelectorManager();
Collection<ManagedSelector> managedSelectors = selectorManager.getBeans(ManagedSelector.class);
for (ManagedSelector managedSelector : managedSelectors)
{
if (managedSelector instanceof CustomManagedSelector)
{
CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector;
Selector selector = customManagedSelector.getSelector();
LOG.debug("Closing selector {}}", selector);
IO.close(selector);
}
}
} }
} }
} }