improve shutodwn with many indices / shards by bounding the concurrency of closing indices / shards

This commit is contained in:
kimchy 2011-06-10 03:48:43 +03:00
parent b143400bea
commit 3b2c186503
2 changed files with 34 additions and 10 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.service;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ElasticSearchInterruptedException; import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.collect.UnmodifiableIterator;
@ -33,7 +34,11 @@ import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.*; import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.IndexCache;
@ -73,6 +78,7 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import static org.elasticsearch.common.collect.MapBuilder.*; import static org.elasticsearch.common.collect.MapBuilder.*;
import static org.elasticsearch.common.collect.Maps.*; import static org.elasticsearch.common.collect.Maps.*;
@ -216,14 +222,15 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexEngine; return indexEngine;
} }
public void close(final boolean delete, final String reason) { public void close(final boolean delete, final String reason, @Nullable Executor executor) {
synchronized (this) { synchronized (this) {
closed = true; closed = true;
} }
Set<Integer> shardIds = shardIds(); Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size()); final CountDownLatch latch = new CountDownLatch(shardIds.size());
for (final int shardId : shardIds) { for (final int shardId : shardIds) {
threadPool.cached().execute(new Runnable() { executor = executor == null ? threadPool.cached() : executor;
executor.execute(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
deleteShard(shardId, delete, !delete, delete, reason); deleteShard(shardId, delete, !delete, delete, reason);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.collect.UnmodifiableIterator;
@ -32,10 +33,16 @@ import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.*; import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexNameModule;
import org.elasticsearch.index.IndexServiceManagement;
import org.elasticsearch.index.LocalNodeIdModule;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule; import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
import org.elasticsearch.index.analysis.AnalysisModule; import org.elasticsearch.index.analysis.AnalysisModule;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
@ -73,6 +80,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.collect.MapBuilder.*; import static org.elasticsearch.common.collect.MapBuilder.*;
@ -122,11 +132,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
ImmutableSet<String> indices = ImmutableSet.copyOf(this.indices.keySet()); ImmutableSet<String> indices = ImmutableSet.copyOf(this.indices.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size()); final CountDownLatch latch = new CountDownLatch(indices.size());
final ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown"));
final ExecutorService shardsStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("shards_shutdown"));
for (final String index : indices) { for (final String index : indices) {
threadPool.cached().execute(new Runnable() { indicesStopExecutor.execute(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
deleteIndex(index, false, "shutdown"); deleteIndex(index, false, "shutdown", shardsStopExecutor);
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to delete index on stop [" + index + "]", e); logger.warn("failed to delete index on stop [" + index + "]", e);
} finally { } finally {
@ -139,6 +153,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} finally {
shardsStopExecutor.shutdown();
indicesStopExecutor.shutdown();
} }
} }
@ -262,14 +279,14 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} }
@Override public synchronized void cleanIndex(String index, String reason) throws ElasticSearchException { @Override public synchronized void cleanIndex(String index, String reason) throws ElasticSearchException {
deleteIndex(index, false, reason); deleteIndex(index, false, reason, null);
} }
@Override public synchronized void deleteIndex(String index, String reason) throws ElasticSearchException { @Override public synchronized void deleteIndex(String index, String reason) throws ElasticSearchException {
deleteIndex(index, true, reason); deleteIndex(index, true, reason, null);
} }
private void deleteIndex(String index, boolean delete, String reason) throws ElasticSearchException { private void deleteIndex(String index, boolean delete, String reason, @Nullable Executor executor) throws ElasticSearchException {
Injector indexInjector; Injector indexInjector;
IndexService indexService; IndexService indexService;
synchronized (this) { synchronized (this) {
@ -295,7 +312,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indexInjector.getInstance(closeable).close(delete); indexInjector.getInstance(closeable).close(delete);
} }
((InternalIndexService) indexService).close(delete, reason); ((InternalIndexService) indexService).close(delete, reason, executor);
indexInjector.getInstance(PercolatorService.class).close(); indexInjector.getInstance(PercolatorService.class).close();
indexInjector.getInstance(IndexCache.class).close(); indexInjector.getInstance(IndexCache.class).close();