diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 2e77653183a..9775d3127c5 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.service; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; @@ -30,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.*; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -84,16 +82,12 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -109,8 +103,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final Settings indexSettings; - private final ThreadPool threadPool; - private final PluginsService pluginsService; private final InternalIndicesLifecycle indicesLifecycle; @@ -148,14 +140,13 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final AtomicBoolean closed = new AtomicBoolean(false); @Inject - public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, + public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache ) { super(index, indexSettings); this.injector = injector; - this.threadPool = threadPool; this.indexSettings = indexSettings; this.analysisService = analysisService; this.mapperService = mapperService; @@ -279,37 +270,19 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexEngine; } - public void close(final String reason, @Nullable Executor executor, final IndicesService.IndexCloseListener listener) { - if (closed.compareAndSet(false, true)) { - final Set shardIds = shardIds(); - final CountDownLatch latch = new CountDownLatch(shardIds.size()); - final IndicesService.IndexCloseListener innerListener = listener == null ? null : - new PerShardIndexCloseListener(shardIds, listener); - for (final int shardId : shardIds) { - executor = executor == null ? threadPool.generic() : executor; - executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - logger.warn("failed to close shard", t); - } - - @Override - public void doRun() { - try { - removeShard(shardId, reason, innerListener); - } finally { - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - logger.debug("Interrupted closing index [{}]", e, index().name()); - Thread.currentThread().interrupt(); - } - } + public synchronized void close(final String reason, final IndicesService.IndexCloseListener listener) { + if (closed.compareAndSet(false, true)) { + final Set shardIds = shardIds(); + final IndicesService.IndexCloseListener innerListener = listener == null ? null : + new PerShardIndexCloseListener(shardIds, listener); + for (final int shardId : shardIds) { + try { + removeShard(shardId, reason, innerListener); + } catch (Throwable t) { + logger.warn("failed to close shard", t); + } + } + } } @Override diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 5e0ea93e94e..47a05f4c036 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -65,7 +65,6 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreModule; -import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; @@ -75,8 +74,10 @@ import org.elasticsearch.plugins.PluginsService; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -128,22 +129,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent failures) { latch.countDown(); } + @Override - public void onShardClosed(ShardId shardId) {} + public void onShardClosed(ShardId shardId) { + } + @Override - public void onShardCloseFailed(ShardId shardId, Throwable t) {} + public void onShardCloseFailed(ShardId shardId, Throwable t) { + } }); } catch (Throwable e) { latch.countDown(); @@ -159,7 +163,6 @@ public class InternalIndicesService extends AbstractLifecycleComponent tmpMap = newHashMap(indices); + indexService = tmpMap.remove(index); + indices = ImmutableMap.copyOf(tmpMap); } - - logger.debug("[{}] closing ... (reason [{}])", index, reason); - Map tmpMap = newHashMap(indices); - indexService = tmpMap.remove(index); - indices = ImmutableMap.copyOf(tmpMap); - indicesLifecycle.beforeIndexClosed(indexService); for (Class closeable : pluginsService.indexServices()) { @@ -352,7 +353,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent