when shutting down a node, close shard (and indices) in parallel

This commit is contained in:
kimchy 2010-07-19 02:17:59 +03:00
parent d657d4447b
commit 0d20790ffe
3 changed files with 94 additions and 45 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.service;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.component.CloseableIndexComponent;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.index.IndexComponent;
@ -34,8 +35,6 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
@ -68,7 +67,7 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
int numberOfShards();
Set<Integer> shardIds();
ImmutableSet<Integer> shardIds();
boolean hasShard(int shardId);

View File

@ -20,7 +20,9 @@
package org.elasticsearch.index.service;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.component.CloseableIndexComponent;
@ -62,16 +64,17 @@ import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.threadpool.ThreadPool;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.collect.MapBuilder.*;
import static org.elasticsearch.common.collect.Maps.*;
import static org.elasticsearch.common.collect.Sets.*;
/**
* @author kimchy (shay.banon)
@ -82,6 +85,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final Settings indexSettings;
private final ThreadPool threadPool;
private final PluginsService pluginsService;
private final InternalIndicesLifecycle indicesLifecycle;
@ -108,11 +113,12 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final CleanCacheOnIndicesLifecycleListener cleanCacheOnIndicesLifecycleListener = new CleanCacheOnIndicesLifecycleListener();
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings,
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool,
MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, OperationRouting operationRouting) {
super(index, indexSettings);
this.injector = injector;
this.threadPool = threadPool;
this.indexSettings = indexSettings;
this.mapperService = mapperService;
this.queryParserService = queryParserService;
@ -153,8 +159,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexShard;
}
@Override public Set<Integer> shardIds() {
return newHashSet(shards.keySet());
@Override public ImmutableSet<Integer> shardIds() {
return ImmutableSet.copyOf(shards.keySet());
}
@Override public Injector injector() {
@ -193,14 +199,27 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexEngine;
}
@Override public synchronized void close(boolean delete) {
@Override public void close(final boolean delete) {
try {
for (int shardId : shardIds()) {
try {
deleteShard(shardId, delete, delete);
} catch (Exception e) {
logger.warn("failed to close shard, delete [{}]", e, delete);
}
Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
for (final int shardId : shardIds) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
deleteShard(shardId, delete, delete);
} catch (Exception e) {
logger.warn("failed to close shard, delete [{}]", e, delete);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticSearchInterruptedException("interrupted closing index [ " + index().name() + "]", e);
}
} finally {
indicesLifecycle.removeListener(cleanCacheOnIndicesLifecycleListener);
@ -259,23 +278,27 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
deleteShard(shardId, true, false);
}
private synchronized void deleteShard(int shardId, boolean delete, boolean deleteGateway) throws ElasticSearchException {
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
Injector shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
if (!delete) {
return;
private void deleteShard(int shardId, boolean delete, boolean deleteGateway) throws ElasticSearchException {
Injector shardInjector;
IndexShard indexShard;
synchronized (this) {
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
if (!delete) {
return;
}
throw new IndexShardMissingException(new ShardId(index, shardId));
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
if (delete) {
logger.debug("deleting shard_id [{}]", shardId);
}
throw new IndexShardMissingException(new ShardId(index, shardId));
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
if (delete) {
logger.debug("deleting shard_id [{}]", shardId);
}
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
IndexShard indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
}
ShardId sId = new ShardId(index, shardId);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.CloseableIndexComponent;
@ -56,11 +57,13 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.IndicesPluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.collect.MapBuilder.*;
@ -74,6 +77,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.*;
@ThreadSafe
public class InternalIndicesService extends AbstractLifecycleComponent<IndicesService> implements IndicesService {
private final ThreadPool threadPool;
private final InternalIndicesLifecycle indicesLifecycle;
private final IndicesAnalysisService indicesAnalysisService;
@ -88,8 +93,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of();
@Inject public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
@Inject public InternalIndicesService(Settings settings, ThreadPool threadPool, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
super(settings);
this.threadPool = threadPool;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indicesAnalysisService = indicesAnalysisService;
this.indicesStore = indicesStore;
@ -112,8 +118,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
}
@Override protected void doStop() throws ElasticSearchException {
for (String index : indices.keySet()) {
deleteIndex(index, false);
ImmutableSet<String> indices = ImmutableSet.copyOf(this.indices.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size());
for (final String index : indices) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
deleteIndex(index, false);
} catch (Exception e) {
logger.warn("failed to delete index on stop [" + index + "]", e);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
}
@ -227,21 +250,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
deleteIndex(index, true);
}
private synchronized void deleteIndex(String index, boolean delete) throws ElasticSearchException {
Injector indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
if (!delete) {
return;
private void deleteIndex(String index, boolean delete) throws ElasticSearchException {
Injector indexInjector;
IndexService indexService;
synchronized (this) {
indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
if (!delete) {
return;
}
throw new IndexMissingException(new Index(index));
}
if (delete) {
logger.debug("deleting Index [{}]", index);
}
throw new IndexMissingException(new Index(index));
}
if (delete) {
logger.debug("deleting Index [{}]", index);
}
Map<String, IndexService> tmpMap = newHashMap(indices);
IndexService indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
}
indicesLifecycle.beforeIndexClosed(indexService, delete);