453386 - Jetty not working when configuring QueuedThreadPool with minThreads=0.
Fixed by properly initializing the queue capacity and by making sure that if minThreads=0 at least one thread is always started to handle the job.
This commit is contained in:
parent
981e27b383
commit
ae3e491f26
|
@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||
|
@ -39,6 +38,7 @@ import org.eclipse.jetty.util.annotation.Name;
|
|||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
||||
|
@ -90,9 +90,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
setStopTimeout(5000);
|
||||
|
||||
if (queue==null)
|
||||
queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
|
||||
{
|
||||
int capacity=Math.max(_minThreads, 8);
|
||||
queue=new BlockingArrayQueue<>(capacity, capacity);
|
||||
}
|
||||
_jobs=queue;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,7 +167,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
StringBuilder dmp = new StringBuilder();
|
||||
for (StackTraceElement element : unstopped.getStackTrace())
|
||||
{
|
||||
dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
|
||||
dmp.append(System.lineSeparator()).append("\tat ").append(element);
|
||||
}
|
||||
LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
|
||||
}
|
||||
|
@ -359,6 +361,12 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
LOG.warn("{} rejected {}", this, job);
|
||||
throw new RejectedExecutionException(job.toString());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Make sure there is at least one thread executing the job.
|
||||
if (getThreads() == 0)
|
||||
startThreads(1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -429,14 +437,13 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
|
||||
thread.start();
|
||||
started = true;
|
||||
--threadsToStart;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!started)
|
||||
_threadsStarted.decrementAndGet();
|
||||
}
|
||||
if (started)
|
||||
threadsToStart--;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -446,7 +453,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
return new Thread(runnable);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@ManagedOperation("dump thread state")
|
||||
public String dump()
|
||||
|
@ -481,8 +487,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
{
|
||||
out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
|
||||
if (thread.getPriority()!=Thread.NORM_PRIORITY)
|
||||
out.append(" prio="+thread.getPriority());
|
||||
out.append('\n');
|
||||
out.append(" prio=").append(String.valueOf(thread.getPriority()));
|
||||
out.append(System.lineSeparator());
|
||||
if (!idle)
|
||||
ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
|
||||
}
|
||||
|
@ -666,14 +672,13 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
if (thread.getId() == id)
|
||||
{
|
||||
StringBuilder buf = new StringBuilder();
|
||||
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
|
||||
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
|
||||
buf.append(thread.getState()).append(":").append(System.lineSeparator());
|
||||
for (StackTraceElement element : thread.getStackTrace())
|
||||
buf.append(" at ").append(element.toString()).append('\n');
|
||||
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
|
||||
return buf.toString();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -18,11 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -31,11 +26,15 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner;
|
|||
import org.eclipse.jetty.toolchain.test.annotation.Slow;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.StdErrLog;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(AdvancedRunner.class)
|
||||
public class QueuedThreadPoolTest
|
||||
{
|
||||
|
@ -288,4 +287,25 @@ public class QueuedThreadPoolTest
|
|||
((StdErrLog)Log.getLogger(QueuedThreadPool.class)).setHideStacks(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZeroMinThreads() throws Exception
|
||||
{
|
||||
int maxThreads = 10;
|
||||
int minThreads = 0;
|
||||
QueuedThreadPool pool = new QueuedThreadPool(maxThreads, minThreads);
|
||||
pool.start();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
pool.execute(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue