Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2017-10-30 12:58:48 +01:00
commit 108d56707b
2 changed files with 55 additions and 34 deletions

View File

@ -25,18 +25,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class LivelockTest public class LivelockTest
@ -61,19 +58,20 @@ public class LivelockTest
@After @After
public void after() throws Exception public void after() throws Exception
{ {
if (client != null)
client.stop(); client.stop();
if (server != null)
server.stop(); server.stop();
} }
@Test @Test
@Slow
public void testLivelock() throws Exception public void testLivelock() throws Exception
{ {
// This test applies a moderate connect/request load (5/s) over 5 seconds, // This test applies a moderate connect/request load (5/s) over 5 seconds,
// with a connect timeout of 100, so any delayed connects will be detected. // with a connect timeout of 1000, so any delayed connects will be detected.
// NonBlocking runnables are submitted to both the client and server // NonBlocking actions are submitted to both the client and server
// ManagedSelectors that submit themselves in an attempt to cause a live lock // ManagedSelectors that submit themselves in an attempt to cause a live lock
// as there will always be an action available to run // as there will always be an action available to run.
int count = 25; int count = 25;
HttpClientTransport transport = new HttpClientTransportOverHTTP(1); HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
@ -96,7 +94,7 @@ public class LivelockTest
@Override @Override
public void run() public void run()
{ {
try { Thread.sleep(10);} catch(Exception e) {} sleep(10);
if (busy.get()) if (busy.get())
clientSelector.submit(this); clientSelector.submit(this);
} }
@ -109,13 +107,15 @@ public class LivelockTest
@Override @Override
public void run() public void run()
{ {
try { Thread.sleep(10);} catch(Exception e) {} sleep(10);
if (busy.get()) if (busy.get())
serverSelector.submit(this); serverSelector.submit(this);
} }
}; };
serverSelector.submit(serverLivelock); serverSelector.submit(serverLivelock);
int requestRate = 5;
long pause = 1000 / requestRate;
CountDownLatch latch = new CountDownLatch(count); CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i) for (int i = 0; i < count; ++i)
{ {
@ -126,9 +126,23 @@ public class LivelockTest
if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200) if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown(); latch.countDown();
}); });
Thread.sleep(200); sleep(pause);
} }
Assert.assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); Assert.assertTrue(latch.await(2 * pause * count, TimeUnit.MILLISECONDS));
// Exit the livelocks.
busy.set(false); busy.set(false);
} }
private void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
} }

View File

@ -69,7 +69,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private Selector _selector; private Selector _selector;
private long _actionTime = -1; private long _actionTime = -1;
public ManagedSelector(SelectorManager selectorManager, int id) public ManagedSelector(SelectorManager selectorManager, int id)
{ {
_selectorManager = selectorManager; _selectorManager = selectorManager;
@ -305,11 +304,11 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private Runnable nextAction() private Runnable nextAction()
{ {
Runnable action;
long now = System.nanoTime(); long now = System.nanoTime();
Selector selector = null;
Runnable action = null;
try (Locker.Lock lock = _locker.lock()) try (Locker.Lock lock = _locker.lock())
{ {
// It is important to avoid live-lock (busy blocking) here. If too many actions // It is important to avoid live-lock (busy blocking) here. If too many actions
// are submitted, this can indefinitely defer selection happening. Similarly if // are submitted, this can indefinitely defer selection happening. Similarly if
// we give too much priority to selection, it may prevent actions from being run. // we give too much priority to selection, it may prevent actions from being run.
@ -318,20 +317,23 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
// is spent running actions. The time period is cleared whenever a selection occurs, // is spent running actions. The time period is cleared whenever a selection occurs,
// so that a full period can be spent on actions after every select. // so that a full period can be spent on actions after every select.
if (_actionTime==-1) if (_actionTime == -1)
_actionTime = now;
else if ((now-_actionTime)>TimeUnit.MILLISECONDS.toNanos(100) && _actions.size()>0)
{ {
// Too long spent handling actions, give selection a go by, _actionTime = now;
// immediate wakeup (as if remaining action were just added). }
_selector.wakeup(); else if ((now - _actionTime) > TimeUnit.MILLISECONDS.toNanos(MAX_ACTION_PERIOD_MS) && _actions.size() > 0)
{
// Too much time spent handling actions, give selection a go,
// immediately waking up (as if remaining action were just added).
selector = _selector;
_selecting = false; _selecting = false;
_actionTime = -1; _actionTime = -1;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("force select q={}",_actions.size()); LOG.debug("Forcing selection, actions={}",_actions.size());
return null;
} }
if (selector == null)
{
action = _actions.poll(); action = _actions.poll();
if (action == null) if (action == null)
{ {
@ -339,10 +341,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_selecting = true; _selecting = true;
_actionTime = -1; _actionTime = -1;
} }
return action;
} }
} }
if (selector != null)
selector.wakeup();
return action;
}
private boolean select() private boolean select()
{ {
try try
@ -611,7 +618,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private class CreateEndPoint implements Runnable, Invocable, Closeable private class CreateEndPoint extends Invocable.NonBlocking implements Closeable
{ {
private final SelectableChannel channel; private final SelectableChannel channel;
private final SelectionKey key; private final SelectionKey key;