From 0d20790ffe7aafd3fb2fe8c336aff723ab5c1c0a Mon Sep 17 00:00:00 2001 From: kimchy Date: Mon, 19 Jul 2010 02:17:59 +0300 Subject: [PATCH] when shutting down a node, close shard (and indices) in parallel --- .../index/service/IndexService.java | 5 +- .../index/service/InternalIndexService.java | 75 ++++++++++++------- .../indices/InternalIndicesService.java | 59 +++++++++++---- 3 files changed, 94 insertions(+), 45 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java index 48c9f57e84c..9547720e35e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.service; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.component.CloseableIndexComponent; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.index.IndexComponent; @@ -34,8 +35,6 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; -import java.util.Set; - /** * @author kimchy (shay.banon) */ @@ -68,7 +67,7 @@ public interface IndexService extends IndexComponent, Iterable, Clos int numberOfShards(); - Set shardIds(); + ImmutableSet shardIds(); boolean hasShard(int shardId); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 066023de096..eac4815e6d7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -20,7 +20,9 @@ package org.elasticsearch.index.service; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchInterruptedException; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.component.CloseableIndexComponent; @@ -62,16 +64,17 @@ import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; +import org.elasticsearch.threadpool.ThreadPool; import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import static org.elasticsearch.common.collect.MapBuilder.*; import static org.elasticsearch.common.collect.Maps.*; -import static org.elasticsearch.common.collect.Sets.*; /** * @author kimchy (shay.banon) @@ -82,6 +85,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final Settings indexSettings; + private final ThreadPool threadPool; + private final PluginsService pluginsService; private final InternalIndicesLifecycle indicesLifecycle; @@ -108,11 +113,12 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final CleanCacheOnIndicesLifecycleListener cleanCacheOnIndicesLifecycleListener = new CleanCacheOnIndicesLifecycleListener(); - @Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, + @Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, OperationRouting operationRouting) { super(index, indexSettings); this.injector = injector; + this.threadPool = threadPool; this.indexSettings = indexSettings; this.mapperService = mapperService; this.queryParserService = queryParserService; @@ -153,8 +159,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexShard; } - @Override public Set shardIds() { - return newHashSet(shards.keySet()); + @Override public ImmutableSet shardIds() { + return ImmutableSet.copyOf(shards.keySet()); } @Override public Injector injector() { @@ -193,14 +199,27 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexEngine; } - @Override public synchronized void close(boolean delete) { + @Override public void close(final boolean delete) { try { - for (int shardId : shardIds()) { - try { - deleteShard(shardId, delete, delete); - } catch (Exception e) { - logger.warn("failed to close shard, delete [{}]", e, delete); - } + Set shardIds = shardIds(); + final CountDownLatch latch = new CountDownLatch(shardIds.size()); + for (final int shardId : shardIds) { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + try { + deleteShard(shardId, delete, delete); + } catch (Exception e) { + logger.warn("failed to close shard, delete [{}]", e, delete); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new ElasticSearchInterruptedException("interrupted closing index [ " + index().name() + "]", e); } } finally { indicesLifecycle.removeListener(cleanCacheOnIndicesLifecycleListener); @@ -259,23 +278,27 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde deleteShard(shardId, true, false); } - private synchronized void deleteShard(int shardId, boolean delete, boolean deleteGateway) throws ElasticSearchException { - Map tmpShardInjectors = newHashMap(shardsInjectors); - Injector shardInjector = tmpShardInjectors.remove(shardId); - if (shardInjector == null) { - if (!delete) { - return; + private void deleteShard(int shardId, boolean delete, boolean deleteGateway) throws ElasticSearchException { + Injector shardInjector; + IndexShard indexShard; + synchronized (this) { + Map tmpShardInjectors = newHashMap(shardsInjectors); + shardInjector = tmpShardInjectors.remove(shardId); + if (shardInjector == null) { + if (!delete) { + return; + } + throw new IndexShardMissingException(new ShardId(index, shardId)); + } + shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); + if (delete) { + logger.debug("deleting shard_id [{}]", shardId); } - throw new IndexShardMissingException(new ShardId(index, shardId)); - } - shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); - if (delete) { - logger.debug("deleting shard_id [{}]", shardId); - } - Map tmpShardsMap = newHashMap(shards); - IndexShard indexShard = tmpShardsMap.remove(shardId); - shards = ImmutableMap.copyOf(tmpShardsMap); + Map tmpShardsMap = newHashMap(shards); + indexShard = tmpShardsMap.remove(shardId); + shards = ImmutableMap.copyOf(tmpShardsMap); + } ShardId sId = new ShardId(index, shardId); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 516f6909e23..a7ccfece75d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.CloseableIndexComponent; @@ -56,11 +57,13 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.plugins.IndicesPluginsModule; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.common.collect.MapBuilder.*; @@ -74,6 +77,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.*; @ThreadSafe public class InternalIndicesService extends AbstractLifecycleComponent implements IndicesService { + private final ThreadPool threadPool; + private final InternalIndicesLifecycle indicesLifecycle; private final IndicesAnalysisService indicesAnalysisService; @@ -88,8 +93,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent indices = ImmutableMap.of(); - @Inject public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) { + @Inject public InternalIndicesService(Settings settings, ThreadPool threadPool, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) { super(settings); + this.threadPool = threadPool; this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indicesAnalysisService = indicesAnalysisService; this.indicesStore = indicesStore; @@ -112,8 +118,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent indices = ImmutableSet.copyOf(this.indices.keySet()); + final CountDownLatch latch = new CountDownLatch(indices.size()); + for (final String index : indices) { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + try { + deleteIndex(index, false); + } catch (Exception e) { + logger.warn("failed to delete index on stop [" + index + "]", e); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + // ignore } } @@ -227,21 +250,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent tmpMap = newHashMap(indices); - IndexService indexService = tmpMap.remove(index); - indices = ImmutableMap.copyOf(tmpMap); + Map tmpMap = newHashMap(indices); + indexService = tmpMap.remove(index); + indices = ImmutableMap.copyOf(tmpMap); + } indicesLifecycle.beforeIndexClosed(indexService, delete);