From ab849e8cc5c9408e633a98977bbd3ccf0c1f0040 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Sat, 28 Oct 2017 14:54:08 +1100 Subject: [PATCH] Issue #1920 - Connect Timeouts with NonBlocking CreateEndPoint. Added a LiveLock (BusyBlocking) test. Modified ManagedSelector to fair share between actions and selecting --- .../eclipse/jetty/client/LivelockTest.java | 134 ++++++++++++++++++ .../org/eclipse/jetty/io/ChannelEndPoint.java | 1 - .../org/eclipse/jetty/io/ManagedSelector.java | 88 ++++++------ .../java/org/eclipse/jetty/io/IOTest.java | 63 ++++++++ .../eclipse/jetty/util/thread/Invocable.java | 10 ++ 5 files changed, 253 insertions(+), 43 deletions(-) create mode 100644 jetty-client/src/test/java/org/eclipse/jetty/client/LivelockTest.java diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/LivelockTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/LivelockTest.java new file mode 100644 index 00000000000..71dc8c0e932 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/LivelockTest.java @@ -0,0 +1,134 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.SocketAddressResolver; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class LivelockTest +{ + private Server server; + private ServerConnector connector; + private HttpClient client; + + @Before + public void before() throws Exception + { + Handler handler = new EmptyServerHandler(); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + connector = new ServerConnector(server, 1, 1); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + + @After + public void after() throws Exception + { + client.stop(); + server.stop(); + } + + @Test + @Slow + public void testLivelock() throws Exception + { + // This test applies a moderate connect/request load (5/s) over 5 seconds, + // with a connect timeout of 100, so any delayed connects will be detected. + // NonBlocking runnables are submitted to both the client and server + // ManagedSelectors that submit themselves in an attempt to cause a live lock + // as there will always be an action available to run + + int count = 25; + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + client = new HttpClient(transport, null); + client.setMaxConnectionsPerDestination(2 * count); + client.setMaxRequestsQueuedPerDestination(2 * count); + client.setSocketAddressResolver(new SocketAddressResolver.Sync()); + client.setConnectBlocking(false); + client.setConnectTimeout(1000); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + client.setExecutor(clientThreads); + client.start(); + + AtomicBoolean busy = new AtomicBoolean(true); + + ManagedSelector clientSelector = client.getContainedBeans(ManagedSelector.class).stream().findAny().get(); + Runnable clientLivelock = new Invocable.NonBlocking() + { + @Override + public void run() + { + try { Thread.sleep(10);} catch(Exception e) {} + if (busy.get()) + clientSelector.submit(this); + } + }; + clientSelector.submit(clientLivelock); + + ManagedSelector serverSelector = connector.getContainedBeans(ManagedSelector.class).stream().findAny().get(); + Runnable serverLivelock = new Invocable.NonBlocking() + { + @Override + public void run() + { + try { Thread.sleep(10);} catch(Exception e) {} + if (busy.get()) + serverSelector.submit(this); + } + }; + serverSelector.submit(serverLivelock); + + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + client.newRequest("localhost", connector.getLocalPort()) + .path("/" + i) + .send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + Thread.sleep(200); + } + Assert.assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + busy.set(false); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 84c0cc96ac1..40dd69eaf80 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -429,7 +429,6 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage _selector.submit(_runUpdateKey); } - @Override public String toEndPointString() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 0bd20350837..1d85e72a0c8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -44,7 +44,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; @@ -59,6 +58,7 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; public class ManagedSelector extends ContainerLifeCycle implements Dumpable { private static final Logger LOG = Log.getLogger(ManagedSelector.class); + private static final long MAX_ACTION_PERIOD_MS = Long.getLong("org.eclipse.jetty.io.ManagedSelector.MAX_ACTION_PERIOD_MS",100); private final Locker _locker = new Locker(); private boolean _selecting = false; @@ -67,6 +67,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private final int _id; private final ExecutionStrategy _strategy; private Selector _selector; + private long _actionTime = -1; + public ManagedSelector(SelectorManager selectorManager, int id) { @@ -134,6 +136,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable try (Locker.Lock lock = _locker.lock()) { _actions.offer(change); + if (_selecting) { selector = _selector; @@ -293,7 +296,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable if (action != null) return action; - update(); + updateKeys(); if (!select()) return null; @@ -302,34 +305,41 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private Runnable nextAction() { - while (true) + Runnable action; + long now = System.nanoTime(); + try (Locker.Lock lock = _locker.lock()) { - Runnable action; - try (Locker.Lock lock = _locker.lock()) - { - action = _actions.poll(); - if (action == null) - { - // No more actions, so we need to select - _selecting = true; - return null; - } - } - - if (Invocable.getInvocationType(action)==InvocationType.BLOCKING) - return action; - - try + + // It is important to avoid live-lock (busy blocking) here. If too many actions + // are submitted, this can indefinitely defer selection happening. Similarly if + // we give too much priority to selection, it may prevent actions from being run. + // The solution implemented here is to put a maximum time limit on handling actions + // so that this method will fall through to selection if more than MAX_ACTION_PERIOD_MS + // is spent running actions. The time period is cleared whenever a selection occurs, + // so that a full period can be spent on actions after every select. + + if (_actionTime==-1) + _actionTime = now; + else if ((now-_actionTime)>TimeUnit.MILLISECONDS.toNanos(100) && _actions.size()>0) { + // Too long spent handling actions, give selection a go by, + // immediate wakeup (as if remaining action were just added). + _selector.wakeup(); + _selecting = false; + _actionTime = -1; if (LOG.isDebugEnabled()) - LOG.debug("Running action {}", action); - // Running the change may queue another action. - action.run(); + LOG.debug("force select q={}",_actions.size()); + return null; } - catch (Throwable x) + + action = _actions.poll(); + if (action == null) { - LOG.debug("Could not run action " + action, x); + // No more actions, so we time to do some selecting + _selecting = true; + _actionTime = -1; } + return action; } } @@ -428,8 +438,11 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable return null; } - private void update() + private void updateKeys() { + // Do update keys for only previously selected keys. + // This will update only those keys whose selection did not cause an + // updateKeys action to be submitted. for (SelectionKey key : _keys) updateKey(key); _keys.clear(); @@ -449,16 +462,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private abstract static class NonBlockingAction implements Runnable, Invocable - { - @Override - public final InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - } - - private class DumpKeys extends NonBlockingAction + private class DumpKeys extends Invocable.NonBlocking { private final CountDownLatch latch = new CountDownLatch(1); private final List _dumps; @@ -504,7 +508,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - class Acceptor extends NonBlockingAction implements Selectable, Closeable + class Acceptor extends Invocable.NonBlocking implements Selectable, Closeable { private final SelectableChannel _channel; private SelectionKey _key; @@ -573,7 +577,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - class Accept extends NonBlockingAction implements Closeable + class Accept extends Invocable.NonBlocking implements Closeable { private final SelectableChannel channel; private final Object attachment; @@ -646,7 +650,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - class Connect extends NonBlockingAction + class Connect extends Invocable.NonBlocking { private final AtomicBoolean failed = new AtomicBoolean(); private final SelectableChannel channel; @@ -684,7 +688,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private class ConnectTimeout extends NonBlockingAction + private class ConnectTimeout extends Invocable.NonBlocking { private final Connect connect; @@ -706,7 +710,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private class CloseEndPoints extends NonBlockingAction + private class CloseEndPoints extends Invocable.NonBlocking { private final CountDownLatch _latch = new CountDownLatch(1); private CountDownLatch _allClosed; @@ -772,7 +776,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private class CloseSelector extends NonBlockingAction + private class CloseSelector extends Invocable.NonBlocking { private CountDownLatch _latch = new CountDownLatch(1); @@ -798,7 +802,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private class DestroyEndPoint extends NonBlockingAction implements Closeable + private class DestroyEndPoint extends Invocable.NonBlocking implements Closeable { private final EndPoint endPoint; diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java index 1961fd58279..e2ebdf29cdc 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java @@ -18,8 +18,11 @@ package org.eclipse.jetty.io; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -35,6 +38,8 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.FileChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; @@ -47,6 +52,7 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -487,4 +493,61 @@ public class IOTest for (int i=0;iA task (typically either a {@link Runnable} or {@link Callable} @@ -186,4 +187,13 @@ public interface Invocable return InvocationType.BLOCKING; } + public abstract class NonBlocking implements Runnable, Invocable + { + @Override + public final InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + } + }