Issue #1920 - Connect Timeouts with NonBlocking CreateEndPoint.
Added a LiveLock (BusyBlocking) test. Modified ManagedSelector to fair share between actions and selecting
This commit is contained in:
parent
9fd47fd383
commit
ab849e8cc5
|
@ -0,0 +1,134 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.ManagedSelector;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.toolchain.test.annotation.Slow;
|
||||
import org.eclipse.jetty.util.SocketAddressResolver;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class LivelockTest
|
||||
{
|
||||
private Server server;
|
||||
private ServerConnector connector;
|
||||
private HttpClient client;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception
|
||||
{
|
||||
Handler handler = new EmptyServerHandler();
|
||||
QueuedThreadPool serverThreads = new QueuedThreadPool();
|
||||
serverThreads.setName("server");
|
||||
server = new Server(serverThreads);
|
||||
connector = new ServerConnector(server, 1, 1);
|
||||
server.addConnector(connector);
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception
|
||||
{
|
||||
client.stop();
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Slow
|
||||
public void testLivelock() throws Exception
|
||||
{
|
||||
// This test applies a moderate connect/request load (5/s) over 5 seconds,
|
||||
// with a connect timeout of 100, so any delayed connects will be detected.
|
||||
// NonBlocking runnables are submitted to both the client and server
|
||||
// ManagedSelectors that submit themselves in an attempt to cause a live lock
|
||||
// as there will always be an action available to run
|
||||
|
||||
int count = 25;
|
||||
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
|
||||
client = new HttpClient(transport, null);
|
||||
client.setMaxConnectionsPerDestination(2 * count);
|
||||
client.setMaxRequestsQueuedPerDestination(2 * count);
|
||||
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
|
||||
client.setConnectBlocking(false);
|
||||
client.setConnectTimeout(1000);
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
clientThreads.setName("client");
|
||||
client.setExecutor(clientThreads);
|
||||
client.start();
|
||||
|
||||
AtomicBoolean busy = new AtomicBoolean(true);
|
||||
|
||||
ManagedSelector clientSelector = client.getContainedBeans(ManagedSelector.class).stream().findAny().get();
|
||||
Runnable clientLivelock = new Invocable.NonBlocking()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try { Thread.sleep(10);} catch(Exception e) {}
|
||||
if (busy.get())
|
||||
clientSelector.submit(this);
|
||||
}
|
||||
};
|
||||
clientSelector.submit(clientLivelock);
|
||||
|
||||
ManagedSelector serverSelector = connector.getContainedBeans(ManagedSelector.class).stream().findAny().get();
|
||||
Runnable serverLivelock = new Invocable.NonBlocking()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try { Thread.sleep(10);} catch(Exception e) {}
|
||||
if (busy.get())
|
||||
serverSelector.submit(this);
|
||||
}
|
||||
};
|
||||
serverSelector.submit(serverLivelock);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/" + i)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200)
|
||||
latch.countDown();
|
||||
});
|
||||
Thread.sleep(200);
|
||||
}
|
||||
Assert.assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
||||
busy.set(false);
|
||||
}
|
||||
}
|
|
@ -429,7 +429,6 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
_selector.submit(_runUpdateKey);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toEndPointString()
|
||||
{
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
||||
import org.eclipse.jetty.util.thread.Locker;
|
||||
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
|
@ -59,6 +58,7 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
|
|||
public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
|
||||
private static final long MAX_ACTION_PERIOD_MS = Long.getLong("org.eclipse.jetty.io.ManagedSelector.MAX_ACTION_PERIOD_MS",100);
|
||||
|
||||
private final Locker _locker = new Locker();
|
||||
private boolean _selecting = false;
|
||||
|
@ -67,6 +67,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
private final int _id;
|
||||
private final ExecutionStrategy _strategy;
|
||||
private Selector _selector;
|
||||
private long _actionTime = -1;
|
||||
|
||||
|
||||
public ManagedSelector(SelectorManager selectorManager, int id)
|
||||
{
|
||||
|
@ -134,6 +136,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
try (Locker.Lock lock = _locker.lock())
|
||||
{
|
||||
_actions.offer(change);
|
||||
|
||||
if (_selecting)
|
||||
{
|
||||
selector = _selector;
|
||||
|
@ -293,7 +296,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
if (action != null)
|
||||
return action;
|
||||
|
||||
update();
|
||||
updateKeys();
|
||||
|
||||
if (!select())
|
||||
return null;
|
||||
|
@ -302,34 +305,41 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
|
||||
private Runnable nextAction()
|
||||
{
|
||||
while (true)
|
||||
Runnable action;
|
||||
long now = System.nanoTime();
|
||||
try (Locker.Lock lock = _locker.lock())
|
||||
{
|
||||
Runnable action;
|
||||
try (Locker.Lock lock = _locker.lock())
|
||||
{
|
||||
action = _actions.poll();
|
||||
if (action == null)
|
||||
{
|
||||
// No more actions, so we need to select
|
||||
_selecting = true;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
if (Invocable.getInvocationType(action)==InvocationType.BLOCKING)
|
||||
return action;
|
||||
|
||||
try
|
||||
|
||||
// It is important to avoid live-lock (busy blocking) here. If too many actions
|
||||
// are submitted, this can indefinitely defer selection happening. Similarly if
|
||||
// we give too much priority to selection, it may prevent actions from being run.
|
||||
// The solution implemented here is to put a maximum time limit on handling actions
|
||||
// so that this method will fall through to selection if more than MAX_ACTION_PERIOD_MS
|
||||
// is spent running actions. The time period is cleared whenever a selection occurs,
|
||||
// so that a full period can be spent on actions after every select.
|
||||
|
||||
if (_actionTime==-1)
|
||||
_actionTime = now;
|
||||
else if ((now-_actionTime)>TimeUnit.MILLISECONDS.toNanos(100) && _actions.size()>0)
|
||||
{
|
||||
// Too long spent handling actions, give selection a go by,
|
||||
// immediate wakeup (as if remaining action were just added).
|
||||
_selector.wakeup();
|
||||
_selecting = false;
|
||||
_actionTime = -1;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Running action {}", action);
|
||||
// Running the change may queue another action.
|
||||
action.run();
|
||||
LOG.debug("force select q={}",_actions.size());
|
||||
return null;
|
||||
}
|
||||
catch (Throwable x)
|
||||
|
||||
action = _actions.poll();
|
||||
if (action == null)
|
||||
{
|
||||
LOG.debug("Could not run action " + action, x);
|
||||
// No more actions, so we time to do some selecting
|
||||
_selecting = true;
|
||||
_actionTime = -1;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,8 +438,11 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
return null;
|
||||
}
|
||||
|
||||
private void update()
|
||||
private void updateKeys()
|
||||
{
|
||||
// Do update keys for only previously selected keys.
|
||||
// This will update only those keys whose selection did not cause an
|
||||
// updateKeys action to be submitted.
|
||||
for (SelectionKey key : _keys)
|
||||
updateKey(key);
|
||||
_keys.clear();
|
||||
|
@ -449,16 +462,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private abstract static class NonBlockingAction implements Runnable, Invocable
|
||||
{
|
||||
@Override
|
||||
public final InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
}
|
||||
|
||||
private class DumpKeys extends NonBlockingAction
|
||||
private class DumpKeys extends Invocable.NonBlocking
|
||||
{
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final List<Object> _dumps;
|
||||
|
@ -504,7 +508,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
class Acceptor extends NonBlockingAction implements Selectable, Closeable
|
||||
class Acceptor extends Invocable.NonBlocking implements Selectable, Closeable
|
||||
{
|
||||
private final SelectableChannel _channel;
|
||||
private SelectionKey _key;
|
||||
|
@ -573,7 +577,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
class Accept extends NonBlockingAction implements Closeable
|
||||
class Accept extends Invocable.NonBlocking implements Closeable
|
||||
{
|
||||
private final SelectableChannel channel;
|
||||
private final Object attachment;
|
||||
|
@ -646,7 +650,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
class Connect extends NonBlockingAction
|
||||
class Connect extends Invocable.NonBlocking
|
||||
{
|
||||
private final AtomicBoolean failed = new AtomicBoolean();
|
||||
private final SelectableChannel channel;
|
||||
|
@ -684,7 +688,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private class ConnectTimeout extends NonBlockingAction
|
||||
private class ConnectTimeout extends Invocable.NonBlocking
|
||||
{
|
||||
private final Connect connect;
|
||||
|
||||
|
@ -706,7 +710,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private class CloseEndPoints extends NonBlockingAction
|
||||
private class CloseEndPoints extends Invocable.NonBlocking
|
||||
{
|
||||
private final CountDownLatch _latch = new CountDownLatch(1);
|
||||
private CountDownLatch _allClosed;
|
||||
|
@ -772,7 +776,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private class CloseSelector extends NonBlockingAction
|
||||
private class CloseSelector extends Invocable.NonBlocking
|
||||
{
|
||||
private CountDownLatch _latch = new CountDownLatch(1);
|
||||
|
||||
|
@ -798,7 +802,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private class DestroyEndPoint extends NonBlockingAction implements Closeable
|
||||
private class DestroyEndPoint extends Invocable.NonBlocking implements Closeable
|
||||
{
|
||||
private final EndPoint endPoint;
|
||||
|
||||
|
|
|
@ -18,8 +18,11 @@
|
|||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -35,6 +38,8 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -47,6 +52,7 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
|||
import org.eclipse.jetty.toolchain.test.OS;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -487,4 +493,61 @@ public class IOTest
|
|||
for (int i=0;i<buffers.length;i++)
|
||||
assertEquals(0,buffers[i].remaining());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSelectorWakeup() throws Exception
|
||||
{
|
||||
ServerSocketChannel connector = ServerSocketChannel.open();
|
||||
connector.bind(null);
|
||||
InetSocketAddress addr=(InetSocketAddress)connector.getLocalAddress();
|
||||
|
||||
SocketChannel client = SocketChannel.open();
|
||||
client.connect(new InetSocketAddress("127.0.0.1",addr.getPort()));
|
||||
|
||||
SocketChannel server = connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
Selector selector = Selector.open();
|
||||
SelectionKey key = server.register(selector,SelectionKey.OP_READ);
|
||||
|
||||
assertThat(key,notNullValue());
|
||||
assertThat(selector.selectNow(), is(0));
|
||||
|
||||
client.write(BufferUtil.toBuffer("X"));
|
||||
assertThat(selector.select(), is(1));
|
||||
assertThat(key.readyOps(), is(SelectionKey.OP_READ));
|
||||
assertThat(selector.selectedKeys(), Matchers.contains(key));
|
||||
|
||||
assertThat(selector.select(), is(0));
|
||||
assertThat(key.readyOps(), is(SelectionKey.OP_READ));
|
||||
assertThat(selector.selectedKeys(), Matchers.contains(key));
|
||||
|
||||
client.write(BufferUtil.toBuffer("X"));
|
||||
selector.selectedKeys().clear();
|
||||
assertThat(selector.select(), is(1));
|
||||
assertThat(key.readyOps(), is(SelectionKey.OP_READ));
|
||||
assertThat(selector.selectedKeys(), Matchers.contains(key));
|
||||
|
||||
ByteBuffer buf = BufferUtil.allocate(1024);
|
||||
int p = BufferUtil.flipToFill(buf);
|
||||
assertThat(server.read(buf),is(2));
|
||||
BufferUtil.flipToFlush(buf,p);
|
||||
|
||||
selector.wakeup();
|
||||
selector.selectedKeys().clear();
|
||||
assertThat(selector.select(), is(0));
|
||||
assertThat(selector.selectedKeys().size(),is(0));
|
||||
|
||||
client.write(BufferUtil.toBuffer("X"));
|
||||
selector.wakeup();
|
||||
selector.selectedKeys().clear();
|
||||
assertThat(selector.select(), is(1));
|
||||
assertThat(selector.selectedKeys().size(),is(1));
|
||||
|
||||
p = BufferUtil.flipToFill(buf);
|
||||
assertThat(server.read(buf),is(1));
|
||||
BufferUtil.flipToFlush(buf,p);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.RejectedExecutionException;
|
|||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
||||
|
||||
/**
|
||||
* <p>A task (typically either a {@link Runnable} or {@link Callable}
|
||||
|
@ -186,4 +187,13 @@ public interface Invocable
|
|||
return InvocationType.BLOCKING;
|
||||
}
|
||||
|
||||
public abstract class NonBlocking implements Runnable, Invocable
|
||||
{
|
||||
@Override
|
||||
public final InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue