Merge branch 'jetty-9.4.x' into jetty-10.0.x
Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com> # Conflicts: # jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
This commit is contained in:
commit
6e740060f7
2
KEYS.txt
2
KEYS.txt
|
@ -2,6 +2,6 @@
|
||||||
Jan Bartel <janb@mortbay.com> AED5 EE6C 45D0 FE8D 5D1B 164F 27DE D4BF 6216 DB8F
|
Jan Bartel <janb@mortbay.com> AED5 EE6C 45D0 FE8D 5D1B 164F 27DE D4BF 6216 DB8F
|
||||||
Jesse McConnell <jesse.mcconnell@gmail.com> 2A68 4B57 436A 81FA 8706 B53C 61C3 351A 438A 3B7D
|
Jesse McConnell <jesse.mcconnell@gmail.com> 2A68 4B57 436A 81FA 8706 B53C 61C3 351A 438A 3B7D
|
||||||
Joakim Erdfelt <joakim.erdfelt@gmail.com> 5989 BAF7 6217 B843 D66B E55B 2D0E 1FB8 FE4B 68B4
|
Joakim Erdfelt <joakim.erdfelt@gmail.com> 5989 BAF7 6217 B843 D66B E55B 2D0E 1FB8 FE4B 68B4
|
||||||
Joakim Erdfelt <joakim@apache.org> B59B 67FD 7904 9843 67F9 3180 0818 D9D6 8FB6 7BAC
|
Joakim Erdfelt <joakime@apache.org> B59B 67FD 7904 9843 67F9 3180 0818 D9D6 8FB6 7BAC
|
||||||
Joakim Erdfelt <joakim@erdfelt.com> BFBB 21C2 46D7 7768 3628 7A48 A04E 0C74 ABB3 5FEA
|
Joakim Erdfelt <joakim@erdfelt.com> BFBB 21C2 46D7 7768 3628 7A48 A04E 0C74 ABB3 5FEA
|
||||||
Simone Bordet <simone.bordet@gmail.com> 8B09 6546 B1A8 F026 56B1 5D3B 1677 D141 BCF3 584D
|
Simone Bordet <simone.bordet@gmail.com> 8B09 6546 B1A8 F026 56B1 5D3B 1677 D141 BCF3 584D
|
||||||
|
|
|
@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.IO;
|
||||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||||
import org.eclipse.jetty.util.component.Dumpable;
|
import org.eclipse.jetty.util.component.Dumpable;
|
||||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||||
|
@ -122,12 +123,20 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
start._started.await();
|
start._started.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void onSelectFailed(Throwable cause)
|
||||||
|
{
|
||||||
|
// override to change behavior
|
||||||
|
}
|
||||||
|
|
||||||
public int size()
|
public int size()
|
||||||
{
|
{
|
||||||
Selector s = _selector;
|
Selector s = _selector;
|
||||||
if (s == null)
|
if (s == null)
|
||||||
return 0;
|
return 0;
|
||||||
return s.keys().size();
|
Set<SelectionKey> keys = s.keys();
|
||||||
|
if (keys == null)
|
||||||
|
return 0;
|
||||||
|
return keys.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -135,7 +144,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
{
|
{
|
||||||
// doStop might be called for a failed managedSelector,
|
// doStop might be called for a failed managedSelector,
|
||||||
// We do not want to wait twice, so we only stop once for each start
|
// We do not want to wait twice, so we only stop once for each start
|
||||||
if (_started.compareAndSet(true, false))
|
if (_started.compareAndSet(true, false) && _selector != null)
|
||||||
{
|
{
|
||||||
// Close connections, but only wait a single selector cycle for it to take effect
|
// Close connections, but only wait a single selector cycle for it to take effect
|
||||||
CloseConnections closeConnections = new CloseConnections();
|
CloseConnections closeConnections = new CloseConnections();
|
||||||
|
@ -210,7 +219,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
catch (RejectedExecutionException x)
|
catch (RejectedExecutionException x)
|
||||||
{
|
{
|
||||||
if (task instanceof Closeable)
|
if (task instanceof Closeable)
|
||||||
closeNoExceptions((Closeable)task);
|
IO.close((Closeable)task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,17 +255,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void closeNoExceptions(Closeable closeable)
|
protected void endPointOpened(EndPoint endPoint)
|
||||||
{
|
{
|
||||||
try
|
_selectorManager.endPointOpened(endPoint);
|
||||||
{
|
|
||||||
if (closeable != null)
|
|
||||||
closeable.close();
|
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
|
||||||
|
protected void endPointClosed(EndPoint endPoint)
|
||||||
{
|
{
|
||||||
LOG.ignore(x);
|
_selectorManager.endPointClosed(endPoint);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
|
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
|
||||||
|
@ -266,7 +272,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
endPoint.setConnection(connection);
|
endPoint.setConnection(connection);
|
||||||
selectionKey.attach(endPoint);
|
selectionKey.attach(endPoint);
|
||||||
endPoint.onOpen();
|
endPoint.onOpen();
|
||||||
_selectorManager.endPointOpened(endPoint);
|
endPointOpened(endPoint);
|
||||||
_selectorManager.connectionOpened(connection);
|
_selectorManager.connectionOpened(connection);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Created {}", endPoint);
|
LOG.debug("Created {}", endPoint);
|
||||||
|
@ -496,15 +502,19 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
|
IO.close(_selector);
|
||||||
_selector = null;
|
_selector = null;
|
||||||
|
|
||||||
if (isRunning())
|
if (isRunning())
|
||||||
LOG.warn(x);
|
{
|
||||||
|
LOG.warn("Fatal select() failure", x);
|
||||||
|
onSelectFailed(x);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG.warn(x.toString());
|
LOG.warn(x.toString());
|
||||||
LOG.debug(x);
|
LOG.debug(x);
|
||||||
}
|
}
|
||||||
closeNoExceptions(_selector);
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -541,13 +551,13 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
{
|
{
|
||||||
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
|
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
|
||||||
if (attachment instanceof EndPoint)
|
if (attachment instanceof EndPoint)
|
||||||
closeNoExceptions((EndPoint)attachment);
|
IO.close((EndPoint)attachment);
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
LOG.warn("Could not process key for channel " + key.channel(), x);
|
LOG.warn("Could not process key for channel " + key.channel(), x);
|
||||||
if (attachment instanceof EndPoint)
|
if (attachment instanceof EndPoint)
|
||||||
closeNoExceptions((EndPoint)attachment);
|
IO.close((EndPoint)attachment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -556,7 +566,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
|
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
|
||||||
Object attachment = key.attachment();
|
Object attachment = key.attachment();
|
||||||
if (attachment instanceof EndPoint)
|
if (attachment instanceof EndPoint)
|
||||||
closeNoExceptions((EndPoint)attachment);
|
IO.close((EndPoint)attachment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -661,7 +671,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
closeNoExceptions(_channel);
|
IO.close(_channel);
|
||||||
LOG.warn(x);
|
LOG.warn(x);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -683,7 +693,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
closeNoExceptions(channel);
|
IO.close(channel);
|
||||||
LOG.warn("Accept failed for channel " + channel, x);
|
LOG.warn("Accept failed for channel " + channel, x);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -722,7 +732,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
LOG.debug("closed accept of {}", channel);
|
LOG.debug("closed accept of {}", channel);
|
||||||
closeNoExceptions(channel);
|
IO.close(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -735,7 +745,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
closeNoExceptions(channel);
|
IO.close(channel);
|
||||||
_selectorManager.onAcceptFailed(channel, x);
|
_selectorManager.onAcceptFailed(channel, x);
|
||||||
LOG.debug(x);
|
LOG.debug(x);
|
||||||
}
|
}
|
||||||
|
@ -758,7 +768,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
|
|
||||||
protected void failed(Throwable failure)
|
protected void failed(Throwable failure)
|
||||||
{
|
{
|
||||||
closeNoExceptions(channel);
|
IO.close(channel);
|
||||||
LOG.warn(String.valueOf(failure));
|
LOG.warn(String.valueOf(failure));
|
||||||
LOG.debug(failure);
|
LOG.debug(failure);
|
||||||
_selectorManager.onAcceptFailed(channel, failure);
|
_selectorManager.onAcceptFailed(channel, failure);
|
||||||
|
@ -808,7 +818,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
if (failed.compareAndSet(false, true))
|
if (failed.compareAndSet(false, true))
|
||||||
{
|
{
|
||||||
timeout.cancel();
|
timeout.cancel();
|
||||||
closeNoExceptions(channel);
|
IO.close(channel);
|
||||||
ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
|
ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -864,12 +874,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
{
|
{
|
||||||
if (_closed == null)
|
if (_closed == null)
|
||||||
{
|
{
|
||||||
closeNoExceptions(closeable);
|
IO.close(closeable);
|
||||||
}
|
}
|
||||||
else if (!_closed.contains(closeable))
|
else if (!_closed.contains(closeable))
|
||||||
{
|
{
|
||||||
_closed.add(closeable);
|
_closed.add(closeable);
|
||||||
closeNoExceptions(closeable);
|
IO.close(closeable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -894,12 +904,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
{
|
{
|
||||||
Object attachment = key.attachment();
|
Object attachment = key.attachment();
|
||||||
if (attachment instanceof EndPoint)
|
if (attachment instanceof EndPoint)
|
||||||
closeNoExceptions((EndPoint)attachment);
|
IO.close((EndPoint)attachment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_selector = null;
|
_selector = null;
|
||||||
closeNoExceptions(selector);
|
IO.close(selector);
|
||||||
_stopped.countDown();
|
_stopped.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -924,7 +934,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
}
|
}
|
||||||
catch (Throwable failure)
|
catch (Throwable failure)
|
||||||
{
|
{
|
||||||
closeNoExceptions(_connect.channel);
|
IO.close(_connect.channel);
|
||||||
LOG.warn(String.valueOf(failure));
|
LOG.warn(String.valueOf(failure));
|
||||||
LOG.debug(failure);
|
LOG.debug(failure);
|
||||||
_connect.failed(failure);
|
_connect.failed(failure);
|
||||||
|
@ -957,7 +967,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
Connection connection = endPoint.getConnection();
|
Connection connection = endPoint.getConnection();
|
||||||
if (connection != null)
|
if (connection != null)
|
||||||
_selectorManager.connectionClosed(connection, cause);
|
_selectorManager.connectionClosed(connection, cause);
|
||||||
_selectorManager.endPointClosed(endPoint);
|
ManagedSelector.this.endPointClosed(endPoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,395 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.nio.channels.Selector;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
|
import org.eclipse.jetty.client.HttpClientTransport;
|
||||||
|
import org.eclipse.jetty.client.api.ContentResponse;
|
||||||
|
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||||
|
import org.eclipse.jetty.http.HttpHeader;
|
||||||
|
import org.eclipse.jetty.http.HttpMethod;
|
||||||
|
import org.eclipse.jetty.http.HttpStatus;
|
||||||
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
|
import org.eclipse.jetty.io.ManagedSelector;
|
||||||
|
import org.eclipse.jetty.io.SelectorManager;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
|
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||||
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
import org.eclipse.jetty.util.IO;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class FailedSelectorTest
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(FailedSelectorTest.class);
|
||||||
|
private HttpClient client;
|
||||||
|
private Server server;
|
||||||
|
private StacklessLogging stacklessManagedSelector;
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void stopServerAndClient() throws Exception
|
||||||
|
{
|
||||||
|
server.stop();
|
||||||
|
client.stop();
|
||||||
|
stacklessManagedSelector.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void startClient() throws Exception
|
||||||
|
{
|
||||||
|
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
|
||||||
|
QueuedThreadPool qtp = new QueuedThreadPool();
|
||||||
|
qtp.setName("Client");
|
||||||
|
qtp.setStopTimeout(1000);
|
||||||
|
client = new HttpClient(transport);
|
||||||
|
client.setExecutor(qtp);
|
||||||
|
|
||||||
|
client.setIdleTimeout(1000);
|
||||||
|
client.setMaxConnectionsPerDestination(1);
|
||||||
|
client.setMaxRequestsQueuedPerDestination(1);
|
||||||
|
client.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startServer(Function<Server, ServerConnector> customizeServerConsumer) throws Exception
|
||||||
|
{
|
||||||
|
stacklessManagedSelector = new StacklessLogging(ManagedSelector.class);
|
||||||
|
|
||||||
|
server = new Server();
|
||||||
|
server.setStopTimeout(1000);
|
||||||
|
server.setStopAtShutdown(true);
|
||||||
|
|
||||||
|
ServerConnector connector = customizeServerConsumer.apply(server);
|
||||||
|
server.addConnector(connector);
|
||||||
|
|
||||||
|
ServletContextHandler context = new ServletContextHandler();
|
||||||
|
context.setContextPath("/");
|
||||||
|
context.addServlet(HelloServlet.class, "/hello");
|
||||||
|
|
||||||
|
ServletHolder closeHolder = new ServletHolder(new CloseSelectorServlet(connector));
|
||||||
|
context.addServlet(closeHolder, "/selector/close");
|
||||||
|
|
||||||
|
HandlerList handlers = new HandlerList();
|
||||||
|
handlers.addHandler(context);
|
||||||
|
handlers.addHandler(new DefaultHandler());
|
||||||
|
|
||||||
|
server.setHandler(handlers);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestartServerOnSelectFailure() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch failedLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
startServer((server) ->
|
||||||
|
{
|
||||||
|
RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartServerTask(server, failedLatch));
|
||||||
|
connector.setPort(0);
|
||||||
|
connector.setIdleTimeout(1000);
|
||||||
|
return connector;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Request /hello
|
||||||
|
assertRequestHello();
|
||||||
|
|
||||||
|
// Request /selector/close
|
||||||
|
assertRequestSelectorClose();
|
||||||
|
|
||||||
|
// Wait for selectors to close from action above
|
||||||
|
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Request /hello
|
||||||
|
assertRequestHello();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestartSelectorOnSelectFailure() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch failedLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
startServer((server) ->
|
||||||
|
{
|
||||||
|
RestartSelectorCustomConnector connector = new RestartSelectorCustomConnector(server, 1, 1, new RestartSelectorTask(failedLatch));
|
||||||
|
connector.setPort(0);
|
||||||
|
connector.setIdleTimeout(1000);
|
||||||
|
return connector;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Request /hello
|
||||||
|
assertRequestHello();
|
||||||
|
|
||||||
|
// Request /selector/close
|
||||||
|
assertRequestSelectorClose();
|
||||||
|
|
||||||
|
// Wait for selectors to close from action above
|
||||||
|
assertTrue(failedLatch.await(2, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Request /hello
|
||||||
|
assertRequestHello();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertRequestSelectorClose() throws InterruptedException, ExecutionException, TimeoutException
|
||||||
|
{
|
||||||
|
URI dest = server.getURI().resolve("/selector/close");
|
||||||
|
LOG.info("Requesting GET on {}", dest);
|
||||||
|
|
||||||
|
ContentResponse response = client.newRequest(dest)
|
||||||
|
.method(HttpMethod.GET)
|
||||||
|
.header(HttpHeader.CONNECTION, "close")
|
||||||
|
.send();
|
||||||
|
|
||||||
|
assertThat(dest + " status", response.getStatus(), is(HttpStatus.OK_200));
|
||||||
|
assertThat(dest + " response", response.getContentAsString(), startsWith("Closing selectors "));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertRequestHello() throws InterruptedException, ExecutionException, TimeoutException
|
||||||
|
{
|
||||||
|
URI dest = server.getURI().resolve("/hello");
|
||||||
|
LOG.info("Requesting GET on {}", dest);
|
||||||
|
ContentResponse response = client.newRequest(dest)
|
||||||
|
.method(HttpMethod.GET)
|
||||||
|
.header(HttpHeader.CONNECTION, "close")
|
||||||
|
.send();
|
||||||
|
|
||||||
|
assertThat(dest + " status", response.getStatus(), is(HttpStatus.OK_200));
|
||||||
|
assertThat(dest + " response", response.getContentAsString(), startsWith("Hello "));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HelloServlet extends HttpServlet
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
|
||||||
|
{
|
||||||
|
resp.setContentType("text/plain");
|
||||||
|
resp.setCharacterEncoding("utf-8");
|
||||||
|
resp.getWriter().printf("Hello %s:%d%n", req.getRemoteAddr(), req.getRemotePort());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CloseSelectorServlet extends HttpServlet
|
||||||
|
{
|
||||||
|
private static final int DELAY_MS = 500;
|
||||||
|
private ServerConnector connector;
|
||||||
|
private ScheduledExecutorService scheduledExecutorService;
|
||||||
|
|
||||||
|
public CloseSelectorServlet(ServerConnector connector)
|
||||||
|
{
|
||||||
|
this.connector = connector;
|
||||||
|
scheduledExecutorService = Executors.newScheduledThreadPool(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
|
||||||
|
{
|
||||||
|
resp.setContentType("text/plain");
|
||||||
|
resp.setCharacterEncoding("utf-8");
|
||||||
|
resp.setHeader("Connection", "close");
|
||||||
|
resp.getWriter().printf("Closing selectors in %,d ms%n", DELAY_MS);
|
||||||
|
scheduledExecutorService.schedule(new ForceCloseSelectorTask(connector), DELAY_MS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class RestartSelectorCustomConnector extends ServerConnector
|
||||||
|
{
|
||||||
|
private final Consumer<CustomManagedSelector> onSelectFailConsumer;
|
||||||
|
|
||||||
|
public RestartSelectorCustomConnector(Server server, int acceptors, int selectors, Consumer<CustomManagedSelector> onSelectFailConsumer)
|
||||||
|
{
|
||||||
|
super(server, acceptors, selectors);
|
||||||
|
this.onSelectFailConsumer = onSelectFailConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
|
||||||
|
{
|
||||||
|
return new ServerConnectorManager(executor, scheduler, selectors)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected ManagedSelector newSelector(int id)
|
||||||
|
{
|
||||||
|
return new CustomManagedSelector(this, id, onSelectFailConsumer);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CustomManagedSelector extends ManagedSelector
|
||||||
|
{
|
||||||
|
private final Set<EndPoint> endpoints = ConcurrentHashMap.newKeySet();
|
||||||
|
private final Consumer<CustomManagedSelector> onSelectFailConsumer;
|
||||||
|
|
||||||
|
public CustomManagedSelector(SelectorManager selectorManager, int id, Consumer<CustomManagedSelector> onSelectFailConsumer)
|
||||||
|
{
|
||||||
|
super(selectorManager, id);
|
||||||
|
this.onSelectFailConsumer = onSelectFailConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void endPointOpened(EndPoint endPoint)
|
||||||
|
{
|
||||||
|
super.endPointOpened(endPoint);
|
||||||
|
endpoints.add(endPoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void endPointClosed(EndPoint endPoint)
|
||||||
|
{
|
||||||
|
super.endPointClosed(endPoint);
|
||||||
|
endpoints.remove(endPoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void onSelectFailed(Throwable cause)
|
||||||
|
{
|
||||||
|
endpoints.forEach((endpoint) ->
|
||||||
|
{
|
||||||
|
if (endpoint.getConnection() != null)
|
||||||
|
{
|
||||||
|
IO.close(endpoint.getConnection());
|
||||||
|
}
|
||||||
|
IO.close(endpoint);
|
||||||
|
});
|
||||||
|
endpoints.clear();
|
||||||
|
|
||||||
|
new Thread(() -> onSelectFailConsumer.accept(this), "OnSelectFailedTask").start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RestartSelectorTask implements Consumer<CustomManagedSelector>
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(RestartSelectorTask.class);
|
||||||
|
private final CountDownLatch latch;
|
||||||
|
|
||||||
|
public RestartSelectorTask(CountDownLatch latch)
|
||||||
|
{
|
||||||
|
this.latch = latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(CustomManagedSelector customManagedSelector)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
customManagedSelector.stop();
|
||||||
|
customManagedSelector.start();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RestartServerTask implements Consumer<CustomManagedSelector>
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(RestartServerTask.class);
|
||||||
|
private final Server server;
|
||||||
|
private final CountDownLatch latch;
|
||||||
|
|
||||||
|
public RestartServerTask(Server server, CountDownLatch latch)
|
||||||
|
{
|
||||||
|
this.server = server;
|
||||||
|
this.latch = latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(CustomManagedSelector customManagedSelector)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ForceCloseSelectorTask implements Runnable
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(ForceCloseSelectorTask.class);
|
||||||
|
private final ServerConnector connector;
|
||||||
|
|
||||||
|
public ForceCloseSelectorTask(ServerConnector connector)
|
||||||
|
{
|
||||||
|
this.connector = connector;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
SelectorManager selectorManager = connector.getSelectorManager();
|
||||||
|
Collection<ManagedSelector> managedSelectors = selectorManager.getBeans(ManagedSelector.class);
|
||||||
|
for (ManagedSelector managedSelector : managedSelectors)
|
||||||
|
{
|
||||||
|
if (managedSelector instanceof CustomManagedSelector)
|
||||||
|
{
|
||||||
|
CustomManagedSelector customManagedSelector = (CustomManagedSelector)managedSelector;
|
||||||
|
Selector selector = customManagedSelector.getSelector();
|
||||||
|
LOG.debug("Closing selector {}}", selector);
|
||||||
|
IO.close(selector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||||
|
org.eclipse.jetty.LEVEL=WARN
|
||||||
#org.eclipse.jetty.LEVEL=DEBUG
|
#org.eclipse.jetty.LEVEL=DEBUG
|
||||||
#org.eclipse.jetty.websocket.LEVEL=DEBUG
|
#org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||||
|
|
Loading…
Reference in New Issue