Issue #3989 - Cleaning up ManagedSelector.doStop() for dead selector

Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Joakim Erdfelt 2019-09-06 14:54:01 -05:00
parent 0596d6c352
commit f8041b23bd
2 changed files with 25 additions and 126 deletions

View File

@ -116,6 +116,11 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
// The normal strategy obtains the produced task, schedules
// a new thread to produce more, runs the task and then exits.
_selectorManager.execute(_strategy::produce);
// Set started only if we really are started
Start start = new Start();
submit(start);
start._started.await();
}
protected void onSelectFailed(Throwable cause) throws Exception
@ -151,7 +156,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
// doStop might be called for a failed managedSelector,
// We do not want to wait twice, so we only stop once for each start
if (_started.compareAndSet(true, false))
if (_started.compareAndSet(true, false) && _selector != null)
{
// Close connections, but only wait a single selector cycle for it to take effect
CloseConnections closeConnections = new CloseConnections();
@ -499,7 +504,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
catch (Throwable x)
{
Selector selector = _selector;
IO.close(_selector);
_selector = null;
if (isRunning())
{
LOG.warn("Fatal select() failure", x);
@ -510,8 +517,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.warn(x.toString());
LOG.debug(x);
}
_selector = null;
IO.close(selector);
}
return false;
}

View File

@ -30,10 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -56,10 +52,10 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -72,29 +68,30 @@ public class FailedSelectorTest
private static final Logger LOG = Log.getLogger(FailedSelectorTest.class);
private HttpClient client;
private Server server;
private AsyncCloseSelectorServlet asyncCloseSelectorServlet;
@AfterEach
public void stopServer() throws Exception
public void stopServerAndClient() throws Exception
{
LOG.info("Deathing Server");
server.stop();
LOG.info("Deathing Client");
client.stop();
}
@BeforeEach
public void startClient() throws Exception
{
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
QueuedThreadPool qtp = new QueuedThreadPool();
qtp.setName("Client");
qtp.setStopTimeout(1000);
client = new HttpClient(transport, null);
client.setIdleTimeout(1000);
client.setMaxConnectionsPerDestination(1);
client.setMaxRequestsQueuedPerDestination(1);
client.start();
}
client.setExecutor(qtp);
@AfterEach
public void stopClient() throws Exception
{
client.stop();
client.setIdleTimeout(1000);
// client.setMaxConnectionsPerDestination(1);
// client.setMaxRequestsQueuedPerDestination(1);
client.start();
}
public void startServer(Function<Server, ServerConnector> customizeServerConsumer) throws Exception
@ -113,11 +110,6 @@ public class FailedSelectorTest
ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector));
context.addServlet(closeHolder, "/selector/close");
asyncCloseSelectorServlet = new AsyncCloseSelectorServlet(connector);
ServletHolder asyncCloseHolder = new ServletHolder(asyncCloseSelectorServlet);
asyncCloseHolder.setAsyncSupported(true);
context.addServlet(asyncCloseHolder, "/selector/async-close");
HandlerList handlers = new HandlerList();
handlers.addHandler(context);
handlers.addHandler(new DefaultHandler());
@ -128,7 +120,7 @@ public class FailedSelectorTest
}
@Test
public void testRebuildServerSelectorNormal() throws Exception
public void testRestartServerOnSelectFailure() throws Exception
{
CountDownLatch failedLatch = new CountDownLatch(1);
@ -152,37 +144,8 @@ public class FailedSelectorTest
// Request /hello
assertRequestHello();
}
@Test
@Disabled
public void testRebuildServerSelectorAsync() throws Exception
{
CountDownLatch failedLatch = new CountDownLatch(1);
startServer((server) ->
{
CustomServerConnector connector = new CustomServerConnector(server, 1, 1, new RestartServerTask(server, failedLatch));
connector.setPort(0);
connector.setIdleTimeout(1000);
return connector;
});
// Request /hello
assertRequestHello();
// Request /selector/async-close
assertRequestSelectorClose("/selector/async-close");
// Wait for selectors to close from action above
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
LOG.info("Got failedLatch");
// Ensure that Async Listener onError was called
assertTrue(asyncCloseSelectorServlet.onErrorLatch.await(2, TimeUnit.SECONDS));
// Request /hello
assertRequestHello();
LOG.info("Test done");
}
private void assertRequestSelectorClose(String path) throws InterruptedException, ExecutionException, TimeoutException
@ -233,7 +196,7 @@ public class FailedSelectorTest
}
catch (Exception e)
{
e.printStackTrace();
LOG.warn(e);
}
finally
{
@ -344,73 +307,4 @@ public class FailedSelectorTest
scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS);
}
}
public static class AsyncCloseSelectorServlet extends HttpServlet
{
private static final int DELAY_MS = 200;
private ServerConnector connector;
private ScheduledExecutorService scheduledExecutorService;
public CountDownLatch onErrorLatch = new CountDownLatch(1);
public AsyncCloseSelectorServlet(ServerConnector connector)
{
this.connector = connector;
scheduledExecutorService = Executors.newScheduledThreadPool(5);
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
resp.setContentType("text/plain");
resp.setCharacterEncoding("utf-8");
ServletOutputStream out = resp.getOutputStream();
out.print("Closing selectors " + DELAY_MS);
AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0);
asyncContext.addListener(new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}
@Override
public void onTimeout(AsyncEvent event)
{
}
@Override
public void onError(AsyncEvent event)
{
resp.setStatus(500);
event.getAsyncContext().complete();
onErrorLatch.countDown();
}
@Override
public void onStartAsync(AsyncEvent event)
{
}
});
scheduledExecutorService.schedule(new InterruptSelector(connector), DELAY_MS, TimeUnit.MILLISECONDS);
/* trigger EofException after selector close
scheduledExecutorService.schedule(() ->
{
byte[] b = new byte[128 * 1024 * 1024];
Arrays.fill(b, (byte)'x');
try
{
out.write(b);
out.flush();
}
catch (IOException e)
{
e.printStackTrace(System.out);
}
}, DELAY_MS * 2, TimeUnit.MILLISECONDS);
*/
}
}
}