Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-11.0.x

This commit is contained in:
Greg Wilkins 2022-06-30 16:38:29 +10:00
commit e285b80f45
2 changed files with 118 additions and 21 deletions

View File

@ -243,9 +243,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
{
// Fill the job queue with noop jobs to wakeup idle threads.
for (int i = 0; i < threads; ++i)
{
jobs.offer(NOOP);
}
if (!jobs.offer(NOOP))
break;
// try to let jobs complete naturally for half our stop time
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
@ -255,6 +254,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
// interrupt remaining threads
for (Thread thread : _threads)
{
if (thread == Thread.currentThread())
continue;
if (LOG.isDebugEnabled())
LOG.debug("Interrupting {}", thread);
thread.interrupt();
@ -264,24 +265,21 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
Thread.yield();
if (LOG.isDebugEnabled())
for (Thread unstopped : _threads)
{
for (Thread unstopped : _threads)
if (unstopped == Thread.currentThread())
continue;
String stack = "";
if (LOG.isDebugEnabled())
{
StringBuilder dmp = new StringBuilder();
for (StackTraceElement element : unstopped.getStackTrace())
{
dmp.append(System.lineSeparator()).append("\tat ").append(element);
}
LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
}
}
else
{
for (Thread unstopped : _threads)
{
LOG.warn("{} Couldn't stop {}", this, unstopped);
stack = dmp.toString();
}
LOG.warn("Couldn't stop {}{}", unstopped, stack);
}
}
@ -315,13 +313,32 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private void joinThreads(long stopByNanos) throws InterruptedException
{
for (Thread thread : _threads)
loop : while (true)
{
long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Waiting for {} for {}", thread, canWait);
if (canWait > 0)
thread.join(canWait);
for (Thread thread : _threads)
{
// Don't join ourselves
if (thread == Thread.currentThread())
continue;
long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Waiting for {} for {}", thread, canWait);
if (canWait <= 0)
return;
try
{
thread.join(canWait);
}
catch (InterruptedException e)
{
// Don't stop waiting for a join if interrupted
continue loop;
}
}
return;
}
}

View File

@ -101,6 +101,43 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
}
private static class StoppingTask implements Runnable
{
private final CountDownLatch _running;
private final CountDownLatch _blocked;
private final QueuedThreadPool _tp;
Thread _thread;
CountDownLatch _completed = new CountDownLatch(1);
public StoppingTask(CountDownLatch running, CountDownLatch blocked, QueuedThreadPool tp)
{
_running = running;
_blocked = blocked;
_tp = tp;
}
@Override
public void run()
{
try
{
_thread = Thread.currentThread();
_running.countDown();
_blocked.await();
_tp.doStop();
_completed.countDown();
}
catch (InterruptedException x)
{
x.printStackTrace();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
private class RunningJob implements Runnable
{
final CountDownLatch _run = new CountDownLatch(1);
@ -947,6 +984,49 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
}
@Test
public void testInterruptedStop() throws Exception
{
QueuedThreadPool tp = new QueuedThreadPool();
tp.setStopTimeout(1000);
tp.start();
CountDownLatch running = new CountDownLatch(3);
CountDownLatch blocked = new CountDownLatch(1);
CountDownLatch forever = new CountDownLatch(2);
CountDownLatch interrupted = new CountDownLatch(1);
Runnable runForever = () ->
{
try
{
running.countDown();
forever.await();
}
catch (InterruptedException x)
{
interrupted.countDown();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
};
StoppingTask stopping = new StoppingTask(running, blocked, tp);
tp.execute(runForever);
tp.execute(stopping);
tp.execute(runForever);
assertTrue(running.await(5, TimeUnit.SECONDS));
blocked.countDown();
Thread.sleep(100); // wait until in doStop, then....
stopping._thread.interrupt(); // spurious interrupt
assertTrue(interrupted.await(5, TimeUnit.SECONDS));
assertTrue(stopping._completed.await(5, TimeUnit.SECONDS));
}
private int count(String s, String p)
{
int c = 0;