Internal: IndexService - synchronize close to prevent race condition with shard creation

During node shutdown we have a race condition between processing cluster state updates (creating shards) and closing down the index service. This may cause shards to leak and not be closed properly.

This commit removes the concurrency in shard closing as InternalIndexService.removeShard has been synchronized for a long time now.

On the other hand, the commit restores the parallel shutdown of indices lost in 7e1d8a6ca3

Closes #8557
This commit is contained in:
Boaz Leskes 2014-11-19 10:37:18 +01:00
parent 69ac838259
commit fb81a3203b
2 changed files with 38 additions and 64 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.service;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator; import com.google.common.collect.UnmodifiableIterator;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -30,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*; import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLock;
@ -84,16 +82,12 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule; import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -109,8 +103,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final Settings indexSettings; private final Settings indexSettings;
private final ThreadPool threadPool;
private final PluginsService pluginsService; private final PluginsService pluginsService;
private final InternalIndicesLifecycle indicesLifecycle; private final InternalIndicesLifecycle indicesLifecycle;
@ -148,14 +140,13 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
@Inject @Inject
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine, SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine,
IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData,
BitsetFilterCache bitSetFilterCache ) { BitsetFilterCache bitSetFilterCache ) {
super(index, indexSettings); super(index, indexSettings);
this.injector = injector; this.injector = injector;
this.threadPool = threadPool;
this.indexSettings = indexSettings; this.indexSettings = indexSettings;
this.analysisService = analysisService; this.analysisService = analysisService;
this.mapperService = mapperService; this.mapperService = mapperService;
@ -279,36 +270,18 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexEngine; return indexEngine;
} }
public void close(final String reason, @Nullable Executor executor, final IndicesService.IndexCloseListener listener) { public synchronized void close(final String reason, final IndicesService.IndexCloseListener listener) {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
final Set<Integer> shardIds = shardIds(); final Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
final IndicesService.IndexCloseListener innerListener = listener == null ? null : final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, listener); new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) { for (final int shardId : shardIds) {
executor = executor == null ? threadPool.generic() : executor;
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("failed to close shard", t);
}
@Override
public void doRun() {
try { try {
removeShard(shardId, reason, innerListener); removeShard(shardId, reason, innerListener);
} finally { } catch (Throwable t) {
latch.countDown(); logger.warn("failed to close shard", t);
} }
} }
});
}
try {
latch.await();
} catch (InterruptedException e) {
logger.debug("Interrupted closing index [{}]", e, index().name());
Thread.currentThread().interrupt();
}
} }
} }

View File

@ -65,7 +65,6 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
@ -75,8 +74,10 @@ import org.elasticsearch.plugins.PluginsService;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -128,22 +129,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
final CountDownLatch latch = new CountDownLatch(indices.size()); final CountDownLatch latch = new CountDownLatch(indices.size());
final ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown")); final ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown"));
final ExecutorService shardsStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("shards_shutdown"));
for (final String index : indices) { for (final String index : indices) {
indicesStopExecutor.execute(new Runnable() { indicesStopExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
removeIndex(index, "shutdown", shardsStopExecutor, new IndexCloseListener() { removeIndex(index, "shutdown", new IndexCloseListener() {
@Override @Override
public void onAllShardsClosed(Index index, List<Throwable> failures) { public void onAllShardsClosed(Index index, List<Throwable> failures) {
latch.countDown(); latch.countDown();
} }
@Override @Override
public void onShardClosed(ShardId shardId) {} public void onShardClosed(ShardId shardId) {
}
@Override @Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {} public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
}); });
} catch (Throwable e) { } catch (Throwable e) {
latch.countDown(); latch.countDown();
@ -159,7 +163,6 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} finally { } finally {
shardsStopExecutor.shutdown();
indicesStopExecutor.shutdown(); indicesStopExecutor.shutdown();
} }
} }
@ -325,17 +328,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override @Override
public void removeIndex(String index, String reason) throws ElasticsearchException { public void removeIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, null, null); removeIndex(index, reason, null);
} }
@Override @Override
public void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException { public void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException {
removeIndex(index, reason, null, listener); final IndexService indexService;
} final Injector indexInjector;
synchronized (this) {
private synchronized void removeIndex(String index, String reason, @Nullable Executor executor, @Nullable IndexCloseListener listener) throws ElasticsearchException { indexInjector = indicesInjectors.remove(index);
IndexService indexService;
Injector indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) { if (indexInjector == null) {
return; return;
} }
@ -344,7 +345,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
Map<String, IndexService> tmpMap = newHashMap(indices); Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index); indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap); indices = ImmutableMap.copyOf(tmpMap);
}
indicesLifecycle.beforeIndexClosed(indexService); indicesLifecycle.beforeIndexClosed(indexService);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) { for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {
@ -352,7 +353,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} }
logger.debug("[{}] closing index service", index, reason); logger.debug("[{}] closing index service", index, reason);
((InternalIndexService) indexService).close(reason, executor, listener); ((InternalIndexService) indexService).close(reason, listener);
logger.debug("[{}] closing index cache", index, reason); logger.debug("[{}] closing index cache", index, reason);
indexInjector.getInstance(IndexCache.class).close(); indexInjector.getInstance(IndexCache.class).close();