Raise default DeleteIndex Timeout

Currently the timeout for an delete index operation is set to 10 seconds.
Yet, if a full flush is running while we delete and index this can
easily exceed 10 seconds. The timeout is not dramatic ie. the index
will be deleted eventually but the client request is not acked which
can cause confusion. We should raise it to prevent unnecessary confusion
especially in client tests where this can happen if the machine is pretty busy.

The new timeout is set to 60 seconds.

Closes #3498
This commit is contained in:
Simon Willnauer 2013-08-13 16:50:16 +02:00
parent 534299a27c
commit 7e1d8a6ca3
10 changed files with 90 additions and 108 deletions

View File

@ -38,7 +38,7 @@ public class DeleteIndexRequest extends MasterNodeOperationRequest<DeleteIndexRe
private String[] indices;
private TimeValue timeout = timeValueSeconds(10);
private TimeValue timeout = timeValueSeconds(60);
DeleteIndexRequest() {
}

View File

@ -36,7 +36,7 @@ public class DeleteIndexRequestBuilder extends MasterNodeOperationRequestBuilder
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
* to <tt>60s</tt>.
*/
public DeleteIndexRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);

View File

@ -102,7 +102,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
}
private void deleteIndex(final Request request, final Listener userListener, Semaphore mdLock) {
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, request, userListener);
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, userListener);
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
@ -141,6 +141,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
// add the notifications that the store was deleted from *data* nodes
count += currentState.nodes().dataNodes().size();
final AtomicInteger counter = new AtomicInteger(count);
// this listener will be notified once we get back a notification based on the cluster state change below.
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override
public void onNodeIndexDeleted(String index, String nodeId) {
@ -185,13 +186,11 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
private final AtomicBoolean notified = new AtomicBoolean();
private final Semaphore mdLock;
private final Request request;
private final Listener listener;
volatile ScheduledFuture future;
volatile ScheduledFuture<?> future;
private DeleteIndexListener(Semaphore mdLock, Request request, Listener listener) {
private DeleteIndexListener(Semaphore mdLock, Listener listener) {
this.mdLock = mdLock;
this.request = request;
this.listener = listener;
}

View File

@ -177,7 +177,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
try {
writeGlobalState("changed", newMetaData, currentMetaData);
} catch (Exception e) {
} catch (Throwable e) {
success = false;
}
}
@ -205,7 +205,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
try {
writeIndex(writeReason, indexMetaData, currentIndexMetaData);
} catch (Exception e) {
} catch (Throwable e) {
success = false;
}
}
@ -228,7 +228,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
}
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(current.index(), event.state().nodes().masterNodeId());
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, current.index());
}
}
@ -268,7 +268,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to find dangling indices", e);
}
}
@ -306,7 +306,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
logger.info("failed to send allocated dangled", e);
}
});
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to send allocate dangled", e);
}
}
@ -337,7 +337,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
builder.flush();
String stateFileName = "state-" + indexMetaData.version();
Exception lastFailure = null;
Throwable lastFailure = null;
boolean wroteAtLeastOnce = false;
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
File stateLocation = new File(indexLocation, "_state");
@ -352,7 +352,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
fos.getChannel().force(true);
fos.close();
wroteAtLeastOnce = true;
} catch (Exception e) {
} catch (Throwable e) {
lastFailure = e;
} finally {
IOUtils.closeWhileHandlingException(fos);
@ -396,7 +396,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
builder.flush();
String globalFileName = "global-" + globalMetaData.version();
Exception lastFailure = null;
Throwable lastFailure = null;
boolean wroteAtLeastOnce = false;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
@ -411,7 +411,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
fos.getChannel().force(true);
fos.close();
wroteAtLeastOnce = true;
} catch (Exception e) {
} catch (Throwable e) {
lastFailure = e;
} finally {
IOUtils.closeWhileHandlingException(fos);
@ -498,7 +498,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("[{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, index);
}
}
@ -543,7 +543,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to load global state from [{}]", e, stateFile.getAbsolutePath());
}
}

View File

@ -296,6 +296,11 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
@Override
public synchronized IndexShard createShard(int sShardId) throws ElasticSearchException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
* keep it synced.
*/
if (closed) {
throw new ElasticSearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
}
@ -348,105 +353,92 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
@Override
public synchronized void removeShard(int shardId, String reason) throws ElasticSearchException {
Injector shardInjector;
IndexShard indexShard;
synchronized (this) {
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
return;
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
final Injector shardInjector;
final IndexShard indexShard;
final ShardId sId = new ShardId(index, shardId);
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
return;
}
ShardId sId = new ShardId(index, shardId);
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
indicesLifecycle.beforeIndexShardClosed(sId, indexShard);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) {
try {
shardInjector.getInstance(closeable).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to clean plugin shard service [{}]", e, closeable);
}
}
try {
// now we can close the translog service, we need to close it before the we close the shard
shardInjector.getInstance(TranslogService.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close translog service", e);
// ignore
}
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
try {
((InternalIndexShard) indexShard).close(reason);
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close index shard", e);
// ignore
}
}
try {
shardInjector.getInstance(Engine.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close engine", e);
// ignore
}
try {
shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close merge policy provider", e);
// ignore
}
try {
shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to snapshot index shard gateway on close", e);
// ignore
}
try {
shardInjector.getInstance(IndexShardGatewayService.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close index shard gateway", e);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close translog", e);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(PercolatorQueriesRegistry.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close PercolatorQueriesRegistry", e);
// ignore
}
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId);
// if we delete or have no gateway or the store is not persistent, clean the store...
Store store = shardInjector.getInstance(Store.class);
// and close it
try {
store.close();
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to close store on shard deletion", e);
}
Injectors.close(injector);
}
}

View File

@ -564,7 +564,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
mergeScheduleFuture = null;
}
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
if (logger.isDebugEnabled()) {
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
}
state = IndexShardState.CLOSED;
}
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
@ -77,7 +76,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.IndexPluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap;
import java.util.Map;
@ -99,10 +97,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
*/
public class InternalIndicesService extends AbstractLifecycleComponent<IndicesService> implements IndicesService {
private final NodeEnvironment nodeEnv;
private final ThreadPool threadPool;
private final InternalIndicesLifecycle indicesLifecycle;
private final IndicesAnalysisService indicesAnalysisService;
@ -120,10 +114,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private final OldShardsStats oldShardsStats = new OldShardsStats();
@Inject
public InternalIndicesService(Settings settings, NodeEnvironment nodeEnv, ThreadPool threadPool, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
super(settings);
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indicesAnalysisService = indicesAnalysisService;
this.indicesStore = indicesStore;
@ -400,24 +392,21 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
}
@Override
public synchronized void removeIndex(String index, String reason) throws ElasticSearchException {
public void removeIndex(String index, String reason) throws ElasticSearchException {
removeIndex(index, reason, null);
}
private void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticSearchException {
Injector indexInjector;
private synchronized void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticSearchException {
IndexService indexService;
synchronized (this) {
indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
return;
}
Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
Injector indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
return;
}
Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
indicesLifecycle.beforeIndexClosed(indexService);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {

View File

@ -151,7 +151,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}][{}] removing shard (disabled block persistence)", index, shardId);
try {
indexService.removeShard(shardId, "removing shard (disabled block persistence)");
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to remove shard (disabled block persistence)", e, index);
}
}
@ -174,17 +174,19 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void sendIndexLifecycleEvents(final ClusterChangedEvent event) {
String localNodeId = event.state().nodes().localNodeId();
assert localNodeId != null;
for (String index : event.indicesCreated()) {
try {
nodeIndexCreatedAction.nodeIndexCreated(index, event.state().nodes().localNodeId());
} catch (Exception e) {
nodeIndexCreatedAction.nodeIndexCreated(index, localNodeId);
} catch (Throwable e) {
logger.debug("failed to send to master index {} created event", e, index);
}
}
for (String index : event.indicesDeleted()) {
try {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
} catch (Exception e) {
nodeIndexDeletedAction.nodeIndexDeleted(index, localNodeId);
} catch (Throwable e) {
logger.debug("failed to send to master index {} deleted event", e, index);
}
}
@ -195,7 +197,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (event.indicesStateChanged()) {
try {
nodeIndicesStateUpdatedAction.nodeIndexStateUpdated(new NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse(event.state().nodes().localNodeId(), event.state().version()));
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to send to master indices state change event", e);
}
}
@ -212,7 +214,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}][{}] removing shard (index is closed)", index, shardId);
try {
indexService.removeShard(shardId, "removing shard (index is closed)");
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to remove shard (index is closed)", e, index);
}
}
@ -226,7 +228,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// clean the index
try {
indicesService.removeIndex(index, "removing index (no shards allocated)");
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to clean index (no shards of that index are allocated on this node)", e, index);
}
}
@ -241,7 +243,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
try {
indicesService.removeIndex(index, "index no longer part of the metadata");
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to clean index", e);
}
// clear seen mappings as well
@ -418,7 +420,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId()));
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index, mappingType, mappingSource);
}
return requiresRefresh;
@ -476,7 +478,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
newAliases.put(alias, indexAliasesService.create(alias, filter));
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to add alias [{}], filter [{}]", e, index, alias, filter);
}
}
@ -607,7 +609,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) {
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
break;
}
@ -643,7 +645,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// and that way we handle the edge case where its mark as relocated, and we might need to roll it back...
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) {
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
}
}
@ -688,7 +690,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexService.removeShard(shardRouting.shardId().id(), "ignore recovery: " + reason);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
@ -709,14 +711,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
if (sendShardFailure) {
try {
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Exception e1) {
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
}
}
@ -748,13 +750,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexService.removeShard(shardId.id(), "engine failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after failed engine", e1, indexService.index().name(), shardId.id());
}
}
try {
shardStateAction.shardFailed(fShardRouting, "engine failure, message [" + detailedMessage(failure) + "]");
} catch (Exception e1) {
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id());
}
}

View File

@ -105,6 +105,8 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
.persistentSettings().getAsMap().size(), equalTo(0));
assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData
.persistentSettings().getAsMap().size(), equalTo(0));
wipeIndices(); // wipe after to make sure we fail in the test that didn't ack the delete
wipeTemplates();
}
public static TestCluster cluster() {

View File

@ -33,14 +33,14 @@ import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class UpdateTests extends AbstractSharedClusterTest {
@ -446,21 +446,14 @@ public class UpdateTests extends AbstractSharedClusterTest {
@Test
public void testConcurrentUpdateWithRetryOnConflict() throws Exception {
concurrentUpdateWithRetryOnConflict(false);
}
@Test
public void testConcurrentUpdateWithRetryOnConflict_bulk() throws Exception {
concurrentUpdateWithRetryOnConflict(true);
}
private void concurrentUpdateWithRetryOnConflict(final boolean useBulkApi) throws Exception {
final boolean useBulkApi = randomBoolean();
createIndex();
ensureGreen();
int numberOfThreads = 5;
int numberOfThreads = between(2,5);
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
final int numberOfUpdatesPerThread = 10000;
final int numberOfUpdatesPerThread = between(1000, 10000);
final List<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (int i = 0; i < numberOfThreads; i++) {
Runnable r = new Runnable() {
@ -481,8 +474,8 @@ public class UpdateTests extends AbstractSharedClusterTest {
.execute().actionGet();
}
}
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable e) {
failures.add(e);
} finally {
latch.countDown();
}
@ -492,7 +485,10 @@ public class UpdateTests extends AbstractSharedClusterTest {
new Thread(r).start();
}
latch.await();
for (Throwable throwable : failures) {
logger.info("Captured failure on concurrent update:", throwable);
}
assertThat(failures.size(), equalTo(0));
for (int i = 0; i < numberOfUpdatesPerThread; i++) {
GetResponse response = client().prepareGet("test", "type1", Integer.toString(i)).execute().actionGet();
assertThat(response.getId(), equalTo(Integer.toString(i)));