diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 9bd68b82429..df88bc7b366 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.service; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchInterruptedException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.UnmodifiableIterator; @@ -33,7 +34,11 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.none.NoneGateway; -import org.elasticsearch.index.*; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.CloseableIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexShardAlreadyExistsException; +import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; @@ -73,6 +78,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import static org.elasticsearch.common.collect.MapBuilder.*; import static org.elasticsearch.common.collect.Maps.*; @@ -216,14 +222,15 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexEngine; } - public void close(final boolean delete, final String reason) { + public void close(final boolean delete, final String reason, @Nullable Executor executor) { synchronized (this) { closed = true; } Set shardIds = shardIds(); final CountDownLatch latch = new CountDownLatch(shardIds.size()); for (final int shardId : shardIds) { - threadPool.cached().execute(new Runnable() { + executor = executor == null ? threadPool.cached() : executor; + executor.execute(new Runnable() { @Override public void run() { try { deleteShard(shardId, delete, !delete, delete, reason); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index acec7fbe98e..22bd430b8a0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.UnmodifiableIterator; @@ -32,10 +33,16 @@ import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.index.*; +import org.elasticsearch.index.CloseableIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexNameModule; +import org.elasticsearch.index.IndexServiceManagement; +import org.elasticsearch.index.LocalNodeIdModule; import org.elasticsearch.index.aliases.IndexAliasesServiceModule; import org.elasticsearch.index.analysis.AnalysisModule; import org.elasticsearch.index.analysis.AnalysisService; @@ -73,6 +80,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.common.collect.MapBuilder.*; @@ -122,11 +132,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent indices = ImmutableSet.copyOf(this.indices.keySet()); final CountDownLatch latch = new CountDownLatch(indices.size()); + + final ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown")); + final ExecutorService shardsStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("shards_shutdown")); + for (final String index : indices) { - threadPool.cached().execute(new Runnable() { + indicesStopExecutor.execute(new Runnable() { @Override public void run() { try { - deleteIndex(index, false, "shutdown"); + deleteIndex(index, false, "shutdown", shardsStopExecutor); } catch (Exception e) { logger.warn("failed to delete index on stop [" + index + "]", e); } finally { @@ -139,6 +153,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent