From 42a4ad35da47712039a89a54514b4fa6bbce03b7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Dec 2017 07:45:40 -0500 Subject: [PATCH] Add node name to thread pool executor name This commit adds the node name to the names of thread pool executors so that the node name is visible in rejected execution exception messages. Relates #27663 --- .../service/ClusterApplierService.java | 7 +- .../cluster/service/MasterService.java | 7 +- .../util/concurrent/EsThreadPoolExecutor.java | 21 ++++- .../QueueResizingEsThreadPoolExecutor.java | 47 ++++------ .../discovery/zen/UnicastZenPing.java | 13 +-- .../AutoQueueAdjustingExecutorBuilder.java | 21 +++-- .../threadpool/FixedExecutorBuilder.java | 3 +- .../threadpool/ScalingExecutorBuilder.java | 9 +- .../cluster/service/TaskExecutorTests.java | 2 +- .../util/concurrent/EsExecutorsTests.java | 31 ++++--- .../concurrent/EsThreadPoolExecutorTests.java | 87 +++++++++++++++++++ .../concurrent/PrioritizedExecutorsTests.java | 19 ++-- .../discovery/zen/UnicastZenPingTests.java | 3 +- .../file/FileBasedDiscoveryPlugin.java | 16 ++-- .../test/InternalTestCluster.java | 4 +- 15 files changed, 205 insertions(+), 85 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 13c2e50eba2..9914ee2577a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -133,8 +133,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); Objects.requireNonNull(state.get(), "please set initial state before starting"); addListener(localNodeMasterListeners); - threadPoolExecutor = EsExecutors.newSinglePrioritizing(CLUSTER_UPDATE_THREAD_NAME, - daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler()); + threadPoolExecutor = EsExecutors.newSinglePrioritizing( + nodeName() + "/" + CLUSTER_UPDATE_THREAD_NAME, + daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME), + threadPool.getThreadContext(), + threadPool.scheduler()); } class UpdateTask extends SourcePrioritizedRunnable implements Function { diff --git a/core/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/core/src/main/java/org/elasticsearch/cluster/service/MasterService.java index a5f71dc48b8..6858866d2dc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -104,8 +104,11 @@ public class MasterService extends AbstractLifecycleComponent { protected synchronized void doStart() { Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); - threadPoolExecutor = EsExecutors.newSinglePrioritizing(MASTER_UPDATE_THREAD_NAME, - daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler()); + threadPoolExecutor = EsExecutors.newSinglePrioritizing( + nodeName() + "/" + MASTER_UPDATE_THREAD_NAME, + daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME), + threadPool.getThreadContext(), + threadPool.scheduler()); taskBatcher = new Batcher(logger, threadPoolExecutor); } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index a1ac182b8dc..8bbf0a59ee0 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -37,7 +37,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { /** * Name used in error reporting. */ - protected final String name; + private final String name; + + final String getName() { + return name; + } EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) { @@ -138,15 +142,16 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { } @Override - public String toString() { + public final String toString() { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()).append('['); - b.append(name).append(", "); + b.append("name = ").append(name).append(", "); if (getQueue() instanceof SizeBlockingQueue) { @SuppressWarnings("rawtypes") SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); b.append("queue capacity = ").append(queue.capacity()).append(", "); } + appendThreadPoolExecutorDetails(b); /* * ThreadPoolExecutor has some nice information in its toString but we * can't get at it easily without just getting the toString. @@ -155,6 +160,16 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { return b.toString(); } + /** + * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in + * the form "%s = %s, " + * + * @param sb the {@link StringBuilder} to append to + */ + protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { + + } + protected Runnable wrapRunnable(Runnable command) { return contextHolder.preserveContext(command); } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index 8062d5510c7..e929192b5dd 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -22,21 +22,16 @@ package org.elasticsearch.common.util.concurrent; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ResizableBlockingQueue; import java.util.Locale; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Stream; /** * An extension to thread pool executor, which automatically adjusts the queue size of the @@ -80,8 +75,8 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); - logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size", - name, QUEUE_ADJUSTMENT_AMOUNT); + logger.debug( + "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT); } @Override @@ -180,7 +175,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto final long avgTaskTime = totalNanos / tasksPerFrame; logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " + "[{} tasks/s], optimal queue is [{}], current capacity [{}]", - name, + getName(), tasksPerFrame, TimeValue.timeValueNanos(totalRuntime), TimeValue.timeValueNanos(avgTaskTime), @@ -196,7 +191,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto final int newCapacity = workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize); if (oldCapacity != newCapacity && logger.isDebugEnabled()) { - logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", name, + logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(), newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT, oldCapacity, newCapacity); } @@ -205,7 +200,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto logger.warn((Supplier) () -> new ParameterizedMessage( "failed to calculate optimal queue size for [{}] thread pool, " + "total frame time [{}ns], tasks [{}], task execution time [{}ns]", - name, totalRuntime, tasksPerFrame, totalNanos), + getName(), totalRuntime, tasksPerFrame, totalNanos), e); } finally { // Finally, decrement the task count and time back to their starting values. We @@ -224,7 +219,8 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15 // - Since taskCount will now be incremented forever, it will never be 10 again, // so there will be no further adjustments - logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", name); + logger.debug( + "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName()); totalTaskNanos.getAndSet(1); taskCount.getAndSet(0); startNs = System.nanoTime(); @@ -237,26 +233,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto } @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append(getClass().getSimpleName()).append('['); - b.append(name).append(", "); - - @SuppressWarnings("rawtypes") - ResizableBlockingQueue queue = (ResizableBlockingQueue) getQueue(); - - b.append("queue capacity = ").append(getCurrentCapacity()).append(", "); - b.append("min queue capacity = ").append(minQueueSize).append(", "); - b.append("max queue capacity = ").append(maxQueueSize).append(", "); - b.append("frame size = ").append(tasksPerFrame).append(", "); - b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", "); - b.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long)executionEWMA.getAverage())).append(", "); - b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); - /* - * ThreadPoolExecutor has some nice information in its toString but we - * can't get at it easily without just getting the toString. - */ - b.append(super.toString()).append(']'); - return b.toString(); + protected void appendThreadPoolExecutorDetails(StringBuilder sb) { + sb.append("min queue capacity = ").append(minQueueSize).append(", "); + sb.append("max queue capacity = ").append(maxQueueSize).append(", "); + sb.append("frame size = ").append(tasksPerFrame).append(", "); + sb.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", "); + sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", "); + sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); } + } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 06269706e0d..54cdb7caeaa 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -167,12 +167,13 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); unicastZenPingExecutorService = EsExecutors.newScaling( - "unicast_connect", - 0, concurrentConnects, - 60, - TimeUnit.SECONDS, - threadFactory, - threadPool.getThreadContext()); + nodeName() + "/" + "unicast_connect", + 0, + concurrentConnects, + 60, + TimeUnit.SECONDS, + threadFactory, + threadPool.getThreadContext()); } /** diff --git a/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java b/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java index 265e544d281..ec9d95c722d 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java +++ b/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java @@ -19,23 +19,14 @@ package org.elasticsearch.threadpool; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.node.Node; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; @@ -121,8 +112,16 @@ public final class AutoQueueAdjustingExecutorBuilder extends ExecutorBuilder { try { @@ -360,7 +370,8 @@ public class EsExecutorsTests extends ESTestCase { int queue = between(0, 100); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch executed = new CountDownLatch(1); - EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext); + EsThreadPoolExecutor executor = + EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext); try { Runnable r = () -> { latch.countDown(); @@ -379,6 +390,6 @@ public class EsExecutorsTests extends ESTestCase { latch.countDown(); terminate(executor); } - } + } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java new file mode 100644 index 00000000000..9b9aa50bd16 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; + +public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase { + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put("node.name", "es-thread-pool-executor-tests") + .put("thread_pool.bulk.size", 1) + .put("thread_pool.bulk.queue_size", 0) + .put("thread_pool.search.size", 1) + .put("thread_pool.search.queue_size", 1) + .build(); + } + + public void testRejectedExecutionExceptionContainsNodeName() { + // we test a fixed and an auto-queue executor but not scaling since it does not reject + runThreadPoolExecutorTest(1, ThreadPool.Names.BULK); + runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH); + + } + + private void runThreadPoolExecutorTest(final int fill, final String executor) { + final CountDownLatch latch = new CountDownLatch(fill); + for (int i = 0; i < fill; i++) { + node().injector().getInstance(ThreadPool.class).executor(executor).execute(() -> { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + final AtomicBoolean rejected = new AtomicBoolean(); + node().injector().getInstance(ThreadPool.class).executor(executor).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + + } + + @Override + public void onRejection(Exception e) { + rejected.set(true); + assertThat(e, hasToString(containsString("name = es-thread-pool-executor-tests/" + executor + ", "))); + } + + @Override + protected void doRun() throws Exception { + + } + }); + + latch.countDown(); + assertTrue(rejected.get()); + } + +} diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 17b43a079dc..1eacb4cb18c 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -45,6 +45,10 @@ public class PrioritizedExecutorsTests extends ESTestCase { private final ThreadContext holder = new ThreadContext(Settings.EMPTY); + private String getName() { + return getClass().getName() + "/" + getTestName(); + } + public void testPriorityQueue() throws Exception { PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); List priorities = Arrays.asList(Priority.values()); @@ -65,7 +69,8 @@ public class PrioritizedExecutorsTests extends ESTestCase { } public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); + ExecutorService executor = + EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -94,7 +99,8 @@ public class PrioritizedExecutorsTests extends ESTestCase { } public void testExecutePrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); + ExecutorService executor = + EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -123,7 +129,8 @@ public class PrioritizedExecutorsTests extends ESTestCase { } public void testSubmitPrioritizedExecutorWithCallables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); + ExecutorService executor = + EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -182,7 +189,8 @@ public class PrioritizedExecutorsTests extends ESTestCase { public void testTimeout() throws Exception { ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer); + PrioritizedEsThreadPoolExecutor executor = + EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer); final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1); executor.execute(new Runnable() { @@ -245,7 +253,8 @@ public class PrioritizedExecutorsTests extends ESTestCase { ThreadPool threadPool = new TestThreadPool("test"); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final AtomicBoolean timeoutCalled = new AtomicBoolean(); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer); + PrioritizedEsThreadPoolExecutor executor = + EsExecutors.newSinglePrioritizing(getName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer); final CountDownLatch invoked = new CountDownLatch(1); executor.execute(new Runnable() { @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 853294de186..d40d558d20b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -112,7 +112,8 @@ public class UnicastZenPingTests extends ESTestCase { threadPool = new TestThreadPool(getClass().getName()); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]"); executorService = - EsExecutors.newScaling(getClass().getName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); + EsExecutors.newScaling( + getClass().getName() + "/" + getTestName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); closeables = new Stack<>(); } diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java index b5d16a547d5..a8f3337d50d 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java @@ -29,11 +29,11 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.node.Node; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; @@ -78,13 +78,13 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]"); fileBasedDiscoveryExecutorService = EsExecutors.newScaling( - "file_based_discovery_resolve", - 0, - concurrentConnects, - 60, - TimeUnit.SECONDS, - threadFactory, - threadPool.getThreadContext()); + Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve", + 0, + concurrentConnects, + 60, + TimeUnit.SECONDS, + threadFactory, + threadPool.getThreadContext()); return Collections.emptyList(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ea9b17f10e3..0b001de69cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -138,8 +138,8 @@ import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOU import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; -import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.getTestTransportType; +import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -349,7 +349,7 @@ public final class InternalTestCluster extends TestCluster { // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomNumbers.randomIntBetween(random, 20, 50))); defaultSettings = builder.build(); - executor = EsExecutors.newScaling("test runner", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); + executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); } @Override