Add node name to thread pool executor name
This commit adds the node name to the names of thread pool executors so that the node name is visible in rejected execution exception messages. Relates #27663
This commit is contained in:
parent
144e1698cf
commit
42a4ad35da
|
@ -133,8 +133,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
|
||||
Objects.requireNonNull(state.get(), "please set initial state before starting");
|
||||
addListener(localNodeMasterListeners);
|
||||
threadPoolExecutor = EsExecutors.newSinglePrioritizing(CLUSTER_UPDATE_THREAD_NAME,
|
||||
daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler());
|
||||
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
|
||||
nodeName() + "/" + CLUSTER_UPDATE_THREAD_NAME,
|
||||
daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME),
|
||||
threadPool.getThreadContext(),
|
||||
threadPool.scheduler());
|
||||
}
|
||||
|
||||
class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
|
||||
|
|
|
@ -104,8 +104,11 @@ public class MasterService extends AbstractLifecycleComponent {
|
|||
protected synchronized void doStart() {
|
||||
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
|
||||
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
|
||||
threadPoolExecutor = EsExecutors.newSinglePrioritizing(MASTER_UPDATE_THREAD_NAME,
|
||||
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler());
|
||||
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
|
||||
nodeName() + "/" + MASTER_UPDATE_THREAD_NAME,
|
||||
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME),
|
||||
threadPool.getThreadContext(),
|
||||
threadPool.scheduler());
|
||||
taskBatcher = new Batcher(logger, threadPoolExecutor);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
|||
/**
|
||||
* Name used in error reporting.
|
||||
*/
|
||||
protected final String name;
|
||||
private final String name;
|
||||
|
||||
final String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
|
||||
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
|
||||
|
@ -138,15 +142,16 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
public final String toString() {
|
||||
StringBuilder b = new StringBuilder();
|
||||
b.append(getClass().getSimpleName()).append('[');
|
||||
b.append(name).append(", ");
|
||||
b.append("name = ").append(name).append(", ");
|
||||
if (getQueue() instanceof SizeBlockingQueue) {
|
||||
@SuppressWarnings("rawtypes")
|
||||
SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();
|
||||
b.append("queue capacity = ").append(queue.capacity()).append(", ");
|
||||
}
|
||||
appendThreadPoolExecutorDetails(b);
|
||||
/*
|
||||
* ThreadPoolExecutor has some nice information in its toString but we
|
||||
* can't get at it easily without just getting the toString.
|
||||
|
@ -155,6 +160,16 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
|||
return b.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in
|
||||
* the form "%s = %s, "
|
||||
*
|
||||
* @param sb the {@link StringBuilder} to append to
|
||||
*/
|
||||
protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {
|
||||
|
||||
}
|
||||
|
||||
protected Runnable wrapRunnable(Runnable command) {
|
||||
return contextHolder.preserveContext(command);
|
||||
}
|
||||
|
|
|
@ -22,21 +22,16 @@ package org.elasticsearch.common.util.concurrent;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ResizableBlockingQueue;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* An extension to thread pool executor, which automatically adjusts the queue size of the
|
||||
|
@ -80,8 +75,8 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
this.maxQueueSize = maxQueueSize;
|
||||
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
|
||||
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
|
||||
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
|
||||
name, QUEUE_ADJUSTMENT_AMOUNT);
|
||||
logger.debug(
|
||||
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -180,7 +175,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
final long avgTaskTime = totalNanos / tasksPerFrame;
|
||||
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
|
||||
"[{} tasks/s], optimal queue is [{}], current capacity [{}]",
|
||||
name,
|
||||
getName(),
|
||||
tasksPerFrame,
|
||||
TimeValue.timeValueNanos(totalRuntime),
|
||||
TimeValue.timeValueNanos(avgTaskTime),
|
||||
|
@ -196,7 +191,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
final int newCapacity =
|
||||
workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
|
||||
if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
|
||||
logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", name,
|
||||
logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),
|
||||
newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
|
||||
oldCapacity, newCapacity);
|
||||
}
|
||||
|
@ -205,7 +200,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
|
||||
"failed to calculate optimal queue size for [{}] thread pool, " +
|
||||
"total frame time [{}ns], tasks [{}], task execution time [{}ns]",
|
||||
name, totalRuntime, tasksPerFrame, totalNanos),
|
||||
getName(), totalRuntime, tasksPerFrame, totalNanos),
|
||||
e);
|
||||
} finally {
|
||||
// Finally, decrement the task count and time back to their starting values. We
|
||||
|
@ -224,7 +219,8 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
|
||||
// - Since taskCount will now be incremented forever, it will never be 10 again,
|
||||
// so there will be no further adjustments
|
||||
logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", name);
|
||||
logger.debug(
|
||||
"[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());
|
||||
totalTaskNanos.getAndSet(1);
|
||||
taskCount.getAndSet(0);
|
||||
startNs = System.nanoTime();
|
||||
|
@ -237,26 +233,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder b = new StringBuilder();
|
||||
b.append(getClass().getSimpleName()).append('[');
|
||||
b.append(name).append(", ");
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
ResizableBlockingQueue queue = (ResizableBlockingQueue) getQueue();
|
||||
|
||||
b.append("queue capacity = ").append(getCurrentCapacity()).append(", ");
|
||||
b.append("min queue capacity = ").append(minQueueSize).append(", ");
|
||||
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
|
||||
b.append("frame size = ").append(tasksPerFrame).append(", ");
|
||||
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
|
||||
b.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long)executionEWMA.getAverage())).append(", ");
|
||||
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
|
||||
/*
|
||||
* ThreadPoolExecutor has some nice information in its toString but we
|
||||
* can't get at it easily without just getting the toString.
|
||||
*/
|
||||
b.append(super.toString()).append(']');
|
||||
return b.toString();
|
||||
protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
|
||||
sb.append("min queue capacity = ").append(minQueueSize).append(", ");
|
||||
sb.append("max queue capacity = ").append(maxQueueSize).append(", ");
|
||||
sb.append("frame size = ").append(tasksPerFrame).append(", ");
|
||||
sb.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
|
||||
sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", ");
|
||||
sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -167,12 +167,13 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
|
||||
unicastZenPingExecutorService = EsExecutors.newScaling(
|
||||
"unicast_connect",
|
||||
0, concurrentConnects,
|
||||
60,
|
||||
TimeUnit.SECONDS,
|
||||
threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
nodeName() + "/" + "unicast_connect",
|
||||
0,
|
||||
concurrentConnects,
|
||||
60,
|
||||
TimeUnit.SECONDS,
|
||||
threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,23 +19,14 @@
|
|||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -121,8 +112,16 @@ public final class AutoQueueAdjustingExecutorBuilder extends ExecutorBuilder<Aut
|
|||
TimeValue targetedResponseTime = settings.targetedResponseTime;
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||
final ExecutorService executor =
|
||||
EsExecutors.newAutoQueueFixed(name(), size, initialQueueSize, minQueueSize,
|
||||
maxQueueSize, frameSize, targetedResponseTime, threadFactory, threadContext);
|
||||
EsExecutors.newAutoQueueFixed(
|
||||
settings.nodeName + "/" + name(),
|
||||
size,
|
||||
initialQueueSize,
|
||||
minQueueSize,
|
||||
maxQueueSize,
|
||||
frameSize,
|
||||
targetedResponseTime,
|
||||
threadFactory,
|
||||
threadContext);
|
||||
// TODO: in a subsequent change we hope to extend ThreadPool.Info to be more specific for the thread pool type
|
||||
final ThreadPool.Info info =
|
||||
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE,
|
||||
|
|
|
@ -94,7 +94,8 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
|||
int size = settings.size;
|
||||
int queueSize = settings.queueSize;
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||
final ExecutorService executor = EsExecutors.newFixed(name(), size, queueSize, threadFactory, threadContext);
|
||||
final ExecutorService executor =
|
||||
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
|
||||
final ThreadPool.Info info =
|
||||
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
|
||||
return new ThreadPool.ExecutorHolder(executor, info);
|
||||
|
|
|
@ -97,7 +97,14 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecuto
|
|||
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||
final ExecutorService executor =
|
||||
EsExecutors.newScaling(name(), core, max, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
|
||||
EsExecutors.newScaling(
|
||||
settings.nodeName + "/" + name(),
|
||||
core,
|
||||
max,
|
||||
keepAlive.millis(),
|
||||
TimeUnit.MILLISECONDS,
|
||||
threadFactory,
|
||||
threadContext);
|
||||
return new ThreadPool.ExecutorHolder(executor, info);
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TaskExecutorTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void setUpExecutor() {
|
||||
threadExecutor = EsExecutors.newSinglePrioritizing("test_thread",
|
||||
threadExecutor = EsExecutors.newSinglePrioritizing(getClass().getName() + "/" + getTestName(),
|
||||
daemonThreadFactory(Settings.EMPTY, "test_thread"), threadPool.getThreadContext(), threadPool.scheduler());
|
||||
}
|
||||
|
||||
|
|
|
@ -44,8 +44,13 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
return TimeUnit.values()[between(0, TimeUnit.values().length - 1)];
|
||||
}
|
||||
|
||||
private String getName() {
|
||||
return getClass().getName() + "/" + getTestName();
|
||||
}
|
||||
|
||||
public void testFixedForcedExecution() throws Exception {
|
||||
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
EsThreadPoolExecutor executor =
|
||||
EsExecutors.newFixed(getName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
final CountDownLatch wait = new CountDownLatch(1);
|
||||
|
||||
final CountDownLatch exec1Wait = new CountDownLatch(1);
|
||||
|
@ -107,7 +112,8 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFixedRejected() throws Exception {
|
||||
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
EsThreadPoolExecutor executor =
|
||||
EsExecutors.newFixed(getName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
final CountDownLatch wait = new CountDownLatch(1);
|
||||
|
||||
final CountDownLatch exec1Wait = new CountDownLatch(1);
|
||||
|
@ -165,7 +171,8 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
final int max = between(min + 1, 6);
|
||||
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
|
||||
|
||||
ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
ThreadPoolExecutor pool =
|
||||
EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
|
||||
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
|
||||
|
||||
|
@ -201,7 +208,8 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
final int max = between(min + 1, 6);
|
||||
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
|
||||
|
||||
final ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
final ThreadPoolExecutor pool =
|
||||
EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"), threadContext);
|
||||
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
|
||||
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
|
||||
|
||||
|
@ -241,7 +249,8 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
int queue = between(0, 100);
|
||||
int actions = queue + pool;
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
|
||||
EsThreadPoolExecutor executor =
|
||||
EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
|
||||
try {
|
||||
for (int i = 0; i < actions; i++) {
|
||||
executor.execute(new Runnable() {
|
||||
|
@ -272,7 +281,7 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
assertFalse("Thread pool registering as terminated when it isn't", e.isExecutorShutdown());
|
||||
String message = ExceptionsHelper.detailedMessage(e);
|
||||
assertThat(message, containsString("of dummy runnable"));
|
||||
assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage"));
|
||||
assertThat(message, containsString("on EsThreadPoolExecutor[name = " + getName()));
|
||||
assertThat(message, containsString("queue capacity = " + queue));
|
||||
assertThat(message, containsString("[Running"));
|
||||
/*
|
||||
|
@ -312,7 +321,7 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
assertTrue("Thread pool not registering as terminated when it is", e.isExecutorShutdown());
|
||||
String message = ExceptionsHelper.detailedMessage(e);
|
||||
assertThat(message, containsString("of dummy runnable"));
|
||||
assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName()));
|
||||
assertThat(message, containsString("on EsThreadPoolExecutor[name = " + getName()));
|
||||
assertThat(message, containsString("queue capacity = " + queue));
|
||||
assertThat(message, containsString("[Terminated"));
|
||||
assertThat(message, containsString("active threads = 0"));
|
||||
|
@ -330,7 +339,8 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
threadContext.putHeader("foo", "bar");
|
||||
final Integer one = new Integer(1);
|
||||
threadContext.putTransient("foo", one);
|
||||
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
|
||||
EsThreadPoolExecutor executor =
|
||||
EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
|
||||
try {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
|
@ -360,7 +370,8 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
int queue = between(0, 100);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final CountDownLatch executed = new CountDownLatch(1);
|
||||
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
|
||||
EsThreadPoolExecutor executor =
|
||||
EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
|
||||
try {
|
||||
Runnable r = () -> {
|
||||
latch.countDown();
|
||||
|
@ -379,6 +390,6 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
latch.countDown();
|
||||
terminate(executor);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
|
||||
public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("node.name", "es-thread-pool-executor-tests")
|
||||
.put("thread_pool.bulk.size", 1)
|
||||
.put("thread_pool.bulk.queue_size", 0)
|
||||
.put("thread_pool.search.size", 1)
|
||||
.put("thread_pool.search.queue_size", 1)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testRejectedExecutionExceptionContainsNodeName() {
|
||||
// we test a fixed and an auto-queue executor but not scaling since it does not reject
|
||||
runThreadPoolExecutorTest(1, ThreadPool.Names.BULK);
|
||||
runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH);
|
||||
|
||||
}
|
||||
|
||||
private void runThreadPoolExecutorTest(final int fill, final String executor) {
|
||||
final CountDownLatch latch = new CountDownLatch(fill);
|
||||
for (int i = 0; i < fill; i++) {
|
||||
node().injector().getInstance(ThreadPool.class).executor(executor).execute(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (final InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final AtomicBoolean rejected = new AtomicBoolean();
|
||||
node().injector().getInstance(ThreadPool.class).executor(executor).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRejection(Exception e) {
|
||||
rejected.set(true);
|
||||
assertThat(e, hasToString(containsString("name = es-thread-pool-executor-tests/" + executor + ", ")));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
latch.countDown();
|
||||
assertTrue(rejected.get());
|
||||
}
|
||||
|
||||
}
|
|
@ -45,6 +45,10 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
|||
|
||||
private final ThreadContext holder = new ThreadContext(Settings.EMPTY);
|
||||
|
||||
private String getName() {
|
||||
return getClass().getName() + "/" + getTestName();
|
||||
}
|
||||
|
||||
public void testPriorityQueue() throws Exception {
|
||||
PriorityBlockingQueue<Priority> queue = new PriorityBlockingQueue<>();
|
||||
List<Priority> priorities = Arrays.asList(Priority.values());
|
||||
|
@ -65,7 +69,8 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
|
||||
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
|
||||
ExecutorService executor =
|
||||
EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
|
||||
List<Integer> results = new ArrayList<>(8);
|
||||
CountDownLatch awaitingLatch = new CountDownLatch(1);
|
||||
CountDownLatch finishedLatch = new CountDownLatch(8);
|
||||
|
@ -94,7 +99,8 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
|
||||
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
|
||||
ExecutorService executor =
|
||||
EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
|
||||
List<Integer> results = new ArrayList<>(8);
|
||||
CountDownLatch awaitingLatch = new CountDownLatch(1);
|
||||
CountDownLatch finishedLatch = new CountDownLatch(8);
|
||||
|
@ -123,7 +129,8 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
|
||||
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
|
||||
ExecutorService executor =
|
||||
EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
|
||||
List<Integer> results = new ArrayList<>(8);
|
||||
CountDownLatch awaitingLatch = new CountDownLatch(1);
|
||||
CountDownLatch finishedLatch = new CountDownLatch(8);
|
||||
|
@ -182,7 +189,8 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
|||
|
||||
public void testTimeout() throws Exception {
|
||||
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName()));
|
||||
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer);
|
||||
PrioritizedEsThreadPoolExecutor executor =
|
||||
EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer);
|
||||
final CountDownLatch invoked = new CountDownLatch(1);
|
||||
final CountDownLatch block = new CountDownLatch(1);
|
||||
executor.execute(new Runnable() {
|
||||
|
@ -245,7 +253,8 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
|||
ThreadPool threadPool = new TestThreadPool("test");
|
||||
final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler();
|
||||
final AtomicBoolean timeoutCalled = new AtomicBoolean();
|
||||
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer);
|
||||
PrioritizedEsThreadPoolExecutor executor =
|
||||
EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer);
|
||||
final CountDownLatch invoked = new CountDownLatch(1);
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
|
|
|
@ -112,7 +112,8 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
threadPool = new TestThreadPool(getClass().getName());
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]");
|
||||
executorService =
|
||||
EsExecutors.newScaling(getClass().getName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
|
||||
EsExecutors.newScaling(
|
||||
getClass().getName() + "/" + getTestName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
|
||||
closeables = new Stack<>();
|
||||
}
|
||||
|
||||
|
|
|
@ -29,11 +29,11 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
@ -78,13 +78,13 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin
|
|||
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
|
||||
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
|
||||
"file_based_discovery_resolve",
|
||||
0,
|
||||
concurrentConnects,
|
||||
60,
|
||||
TimeUnit.SECONDS,
|
||||
threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve",
|
||||
0,
|
||||
concurrentConnects,
|
||||
60,
|
||||
TimeUnit.SECONDS,
|
||||
threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -138,8 +138,8 @@ import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOU
|
|||
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||
import static org.elasticsearch.test.ESTestCase.assertBusy;
|
||||
import static org.elasticsearch.test.ESTestCase.awaitBusy;
|
||||
import static org.elasticsearch.test.ESTestCase.randomFrom;
|
||||
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
|
||||
import static org.elasticsearch.test.ESTestCase.randomFrom;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
@ -349,7 +349,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
// always reduce this - it can make tests really slow
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomNumbers.randomIntBetween(random, 20, 50)));
|
||||
defaultSettings = builder.build();
|
||||
executor = EsExecutors.newScaling("test runner", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));
|
||||
executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue