Merge pull request #4054 from eclipse/jetty-9.4.x-3989-selector-failure

Issue #3989 - add new onSelectFailure method overridable
This commit is contained in:
Joakim Erdfelt 2019-09-10 08:02:46 -05:00 committed by GitHub
commit 115ccf11ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 437 additions and 31 deletions

View File

@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.component.DumpableCollection;
@ -122,12 +123,20 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
start._started.await(); start._started.await();
} }
protected void onSelectFailed(Throwable cause)
{
// override to change behavior
}
public int size() public int size()
{ {
Selector s = _selector; Selector s = _selector;
if (s == null) if (s == null)
return 0; return 0;
return s.keys().size(); Set<SelectionKey> keys = s.keys();
if (keys == null)
return 0;
return keys.size();
} }
@Override @Override
@ -135,7 +144,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
// doStop might be called for a failed managedSelector, // doStop might be called for a failed managedSelector,
// We do not want to wait twice, so we only stop once for each start // We do not want to wait twice, so we only stop once for each start
if (_started.compareAndSet(true, false)) if (_started.compareAndSet(true, false) && _selector != null)
{ {
// Close connections, but only wait a single selector cycle for it to take effect // Close connections, but only wait a single selector cycle for it to take effect
CloseConnections closeConnections = new CloseConnections(); CloseConnections closeConnections = new CloseConnections();
@ -210,7 +219,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
catch (RejectedExecutionException x) catch (RejectedExecutionException x)
{ {
if (task instanceof Closeable) if (task instanceof Closeable)
closeNoExceptions((Closeable)task); IO.close((Closeable)task);
} }
} }
@ -246,17 +255,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private static void closeNoExceptions(Closeable closeable) protected void endPointOpened(EndPoint endPoint)
{ {
try _selectorManager.endPointOpened(endPoint);
{ }
if (closeable != null)
closeable.close(); protected void endPointClosed(EndPoint endPoint)
} {
catch (Throwable x) _selectorManager.endPointClosed(endPoint);
{
LOG.ignore(x);
}
} }
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
@ -266,7 +272,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
endPoint.setConnection(connection); endPoint.setConnection(connection);
selectionKey.attach(endPoint); selectionKey.attach(endPoint);
endPoint.onOpen(); endPoint.onOpen();
_selectorManager.endPointOpened(endPoint); endPointOpened(endPoint);
_selectorManager.connectionOpened(connection); _selectorManager.connectionOpened(connection);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Created {}", endPoint); LOG.debug("Created {}", endPoint);
@ -496,15 +502,19 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable x) catch (Throwable x)
{ {
IO.close(_selector);
_selector = null; _selector = null;
if (isRunning()) if (isRunning())
LOG.warn(x); {
LOG.warn("Fatal select() failure", x);
onSelectFailed(x);
}
else else
{ {
LOG.warn(x.toString()); LOG.warn(x.toString());
LOG.debug(x); LOG.debug(x);
} }
closeNoExceptions(_selector);
} }
return false; return false;
} }
@ -541,13 +551,13 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
LOG.debug("Ignoring cancelled key for channel {}", key.channel()); LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint) if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment); IO.close((EndPoint)attachment);
} }
catch (Throwable x) catch (Throwable x)
{ {
LOG.warn("Could not process key for channel " + key.channel(), x); LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint) if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment); IO.close((EndPoint)attachment);
} }
} }
else else
@ -556,7 +566,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment(); Object attachment = key.attachment();
if (attachment instanceof EndPoint) if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment); IO.close((EndPoint)attachment);
} }
} }
return null; return null;
@ -661,7 +671,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable x) catch (Throwable x)
{ {
closeNoExceptions(_channel); IO.close(_channel);
LOG.warn(x); LOG.warn(x);
} }
} }
@ -683,7 +693,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable x) catch (Throwable x)
{ {
closeNoExceptions(channel); IO.close(channel);
LOG.warn("Accept failed for channel " + channel, x); LOG.warn("Accept failed for channel " + channel, x);
} }
@ -722,7 +732,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
public void close() public void close()
{ {
LOG.debug("closed accept of {}", channel); LOG.debug("closed accept of {}", channel);
closeNoExceptions(channel); IO.close(channel);
} }
@Override @Override
@ -735,7 +745,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable x) catch (Throwable x)
{ {
closeNoExceptions(channel); IO.close(channel);
_selectorManager.onAcceptFailed(channel, x); _selectorManager.onAcceptFailed(channel, x);
LOG.debug(x); LOG.debug(x);
} }
@ -758,7 +768,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
protected void failed(Throwable failure) protected void failed(Throwable failure)
{ {
closeNoExceptions(channel); IO.close(channel);
LOG.warn(String.valueOf(failure)); LOG.warn(String.valueOf(failure));
LOG.debug(failure); LOG.debug(failure);
_selectorManager.onAcceptFailed(channel, failure); _selectorManager.onAcceptFailed(channel, failure);
@ -808,7 +818,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (failed.compareAndSet(false, true)) if (failed.compareAndSet(false, true))
{ {
timeout.cancel(); timeout.cancel();
closeNoExceptions(channel); IO.close(channel);
ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment); ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
} }
} }
@ -864,12 +874,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
if (_closed == null) if (_closed == null)
{ {
closeNoExceptions(closeable); IO.close(closeable);
} }
else if (!_closed.contains(closeable)) else if (!_closed.contains(closeable))
{ {
_closed.add(closeable); _closed.add(closeable);
closeNoExceptions(closeable); IO.close(closeable);
} }
} }
} }
@ -894,12 +904,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
Object attachment = key.attachment(); Object attachment = key.attachment();
if (attachment instanceof EndPoint) if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment); IO.close((EndPoint)attachment);
} }
} }
_selector = null; _selector = null;
closeNoExceptions(selector); IO.close(selector);
_stopped.countDown(); _stopped.countDown();
} }
} }
@ -924,7 +934,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable failure) catch (Throwable failure)
{ {
closeNoExceptions(_connect.channel); IO.close(_connect.channel);
LOG.warn(String.valueOf(failure)); LOG.warn(String.valueOf(failure));
LOG.debug(failure); LOG.debug(failure);
_connect.failed(failure); _connect.failed(failure);
@ -955,7 +965,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
Connection connection = endPoint.getConnection(); Connection connection = endPoint.getConnection();
if (connection != null) if (connection != null)
_selectorManager.connectionClosed(connection); _selectorManager.connectionClosed(connection);
_selectorManager.endPointClosed(endPoint); ManagedSelector.this.endPointClosed(endPoint);
} }
@Override @Override

View File

@ -0,0 +1,395 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.test;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.Selector;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FailedSelectorTest
{
private static final Logger LOG = Log.getLogger(FailedSelectorTest.class);
private HttpClient client;
private Server server;
private StacklessLogging stacklessManagedSelector;
@AfterEach
public void stopServerAndClient() throws Exception
{
server.stop();
client.stop();
stacklessManagedSelector.close();
}
@BeforeEach
public void startClient() throws Exception
{
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
QueuedThreadPool qtp = new QueuedThreadPool();
qtp.setName("Client");
qtp.setStopTimeout(1000);
client = new HttpClient(transport, null);
client.setExecutor(qtp);
client.setIdleTimeout(1000);
client.setMaxConnectionsPerDestination(1);
client.setMaxRequestsQueuedPerDestination(1);
client.start();
}
public void startServer(Function<Server, ServerConnector> customizeServerConsumer) throws Exception
{
stacklessManagedSelector = new StacklessLogging(ManagedSelector.class);
server = new Server();
server.setStopTimeout(1000);
server.setStopAtShutdown(true);
ServerConnector connector = customizeServerConsumer.apply(server);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
context.addServlet(HelloServlet.class, "/hello");
ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector));
context.addServlet(closeHolder, "/selector/close");
HandlerList handlers = new HandlerList();
handlers.addHandler(context);
handlers.addHandler(new DefaultHandler());
server.setHandler(handlers);
server.start();
}
@Test
public void testRestartServerOnSelectFailure() throws Exception
{
CountDownLatch failedLatch = new CountDownLatch(1);
startServer((server) ->
{
RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartServerTask(server, failedLatch));
connector.setPort(0);
connector.setIdleTimeout(1000);
return connector;
});
// Request /hello
assertRequestHello();
// Request /selector/close
assertRequestSelectorClose();
// Wait for selectors to close from action above
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
// Request /hello
assertRequestHello();
}
@Test
public void testRestartSelectorOnSelectFailure() throws Exception
{
CountDownLatch failedLatch = new CountDownLatch(1);
startServer((server) ->
{
RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartSelectorTask(failedLatch));
connector.setPort(0);
connector.setIdleTimeout(1000);
return connector;
});
// Request /hello
assertRequestHello();
// Request /selector/close
assertRequestSelectorClose();
// Wait for selectors to close from action above
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
// Request /hello
assertRequestHello();
}
private void assertRequestSelectorClose() throws InterruptedException, ExecutionException, TimeoutException
{
URI dest = server.getURI().resolve("/selector/close");
LOG.info("Requesting GET on {}", dest);
ContentResponse response = client.newRequest(dest)
.method(HttpMethod.GET)
.header(HttpHeader.CONNECTION, "close")
.send();
assertThat(dest + " status", response.getStatus(), is(HttpStatus.OK_200));
assertThat(dest + " response", response.getContentAsString(), startsWith("Closing selectors "));
}
private void assertRequestHello() throws InterruptedException, ExecutionException, TimeoutException
{
URI dest = server.getURI().resolve("/hello");
LOG.info("Requesting GET on {}", dest);
ContentResponse response = client.newRequest(dest)
.method(HttpMethod.GET)
.header(HttpHeader.CONNECTION, "close")
.send();
assertThat(dest + " status", response.getStatus(), is(HttpStatus.OK_200));
assertThat(dest + " response", response.getContentAsString(), startsWith("Hello "));
}
public static class HelloServlet extends HttpServlet
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
resp.setContentType("text/plain");
resp.setCharacterEncoding("utf-8");
resp.getWriter().printf("Hello %s:%d%n", req.getRemoteAddr(), req.getRemotePort());
}
}
public static class CloseSelectorServlet extends HttpServlet
{
private static final int DELAY_MS = 500;
private ServerConnector connector;
private ScheduledExecutorService scheduledExecutorService;
public CloseSelectorServlet(ServerConnector connector)
{
this.connector = connector;
scheduledExecutorService = Executors.newScheduledThreadPool(5);
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
resp.setContentType("text/plain");
resp.setCharacterEncoding("utf-8");
resp.setHeader("Connection", "close");
resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS);
scheduledExecutorService.schedule(new ForceCloseSelectorTask(connector), DELAY_MS, TimeUnit.MILLISECONDS);
}
}
public static class RestartSelectorCustomConnector extends ServerConnector
{
private final Consumer<CustomManagedSelector> onSelectFailConsumer;
public RestartSelectorCustomConnector(Server server, int acceptors, int selectors, Consumer<CustomManagedSelector> onSelectFailConsumer)
{
super(server, acceptors, selectors);
this.onSelectFailConsumer = onSelectFailConsumer;
}
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new CustomManagedSelector(this, id, onSelectFailConsumer);
}
};
}
}
public static class CustomManagedSelector extends ManagedSelector
{
private final Set<EndPoint> endpoints = ConcurrentHashMap.newKeySet();
private final Consumer<CustomManagedSelector> onSelectFailConsumer;
public CustomManagedSelector(SelectorManager selectorManager, int id, Consumer<CustomManagedSelector> onSelectFailConsumer)
{
super(selectorManager, id);
this.onSelectFailConsumer = onSelectFailConsumer;
}
@Override
protected void endPointOpened(EndPoint endPoint)
{
super.endPointOpened(endPoint);
endpoints.add(endPoint);
}
@Override
protected void endPointClosed(EndPoint endPoint)
{
super.endPointClosed(endPoint);
endpoints.remove(endPoint);
}
@Override
protected void onSelectFailed(Throwable cause)
{
endpoints.forEach((endpoint) ->
{
if (endpoint.getConnection() != null)
{
IO.close(endpoint.getConnection());
}
IO.close(endpoint);
});
endpoints.clear();
new Thread(() -> onSelectFailConsumer.accept(this), "OnSelectFailedTask").start();
}
}
private static class RestartSelectorTask implements Consumer<CustomManagedSelector>
{
private static final Logger LOG = Log.getLogger(RestartSelectorTask.class);
private final CountDownLatch latch;
public RestartSelectorTask(CountDownLatch latch)
{
this.latch = latch;
}
@Override
public void accept(CustomManagedSelector customManagedSelector)
{
try
{
customManagedSelector.stop();
customManagedSelector.start();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
latch.countDown();
}
}
}
private static class RestartServerTask implements Consumer<CustomManagedSelector>
{
private static final Logger LOG = Log.getLogger(RestartServerTask.class);
private final Server server;
private final CountDownLatch latch;
public RestartServerTask(Server server, CountDownLatch latch)
{
this.server = server;
this.latch = latch;
}
@Override
public void accept(CustomManagedSelector customManagedSelector)
{
try
{
server.stop();
server.start();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
latch.countDown();
}
}
}
private static class ForceCloseSelectorTask implements Runnable
{
private static final Logger LOG = Log.getLogger(ForceCloseSelectorTask.class);
private final ServerConnector connector;
public ForceCloseSelectorTask(ServerConnector connector)
{
this.connector = connector;
}
@Override
public void run()
{
SelectorManager selectorManager = connector.getSelectorManager();
Collection<ManagedSelector> managedSelectors = selectorManager.getBeans(ManagedSelector.class);
for (ManagedSelector managedSelector : managedSelectors)
{
if (managedSelector instanceof CustomManagedSelector)
{
CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector;
Selector selector = customManagedSelector.getSelector();
LOG.debug("Closing selector {}}", selector);
IO.close(selector);
}
}
}
}
}

View File

@ -1,3 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
#org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.websocket.LEVEL=DEBUG #org.eclipse.jetty.websocket.LEVEL=DEBUG