From 23022227d4c82c691ce03130de659d50f758cbe2 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 5 Feb 2015 09:09:37 +0100 Subject: [PATCH] Recovery: add a timeout to local mapping change check After phase1 of recovery is completed, we check that all pending mapping changes have been sent to the master and processed by the other nodes. This is needed in order to make sure that the target node has the latest mapping (we just copied over the corresponding lucene files). To make sure we do not miss updates, we do so under a local cluster state update task. At the moment we don't have a timeout when waiting on the task to be completed. If the local node update thread is very busy, this may stall the recovery for too long. This commit adds a timeout (equal to `indices.recovery.internal_action_timeout`) and upgrade the task urgency to `IMMEDIATE`. If we fail to perform the check, we fail the recovery. Closes #9575 --- .../cluster/ClusterStateUpdateTask.java | 3 +- .../recovery/ShardRecoveryHandler.java | 26 ++++- .../recovery/RelocationTests.java | 99 +++++++++++++++++-- 3 files changed, 113 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 921b6d149ee..e1a17c202a8 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; /** @@ -36,7 +35,7 @@ abstract public class ClusterStateUpdateTask { /** * A callback called when execute fails. */ - abstract public void onFailure(String source, @Nullable Throwable t); + abstract public void onFailure(String source, Throwable t); /** diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index 8ad44eaab48..01c049c7c80 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -28,11 +28,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -480,7 +481,20 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { // while we're checking final BlockingQueue documentMappersToUpdate = ConcurrentCollections.newBlockingQueue(); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() { + final AtomicReference mappingCheckException = new AtomicReference<>(); + // we use immediate as this is a very light weight check and we don't wait to delay recovery + clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() { + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public TimeValue timeout() { + return recoverySettings.internalActionTimeout(); + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { latch.countDown(); @@ -507,8 +521,8 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { } @Override - public void onFailure(String source, @Nullable Throwable t) { - logger.error("unexpected error while checking for pending mapping changes", t); + public void onFailure(String source, Throwable t) { + mappingCheckException.set(t); latch.countDown(); } }); @@ -518,6 +532,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { latch.await(); } }); + if (mappingCheckException.get() != null) { + logger.warn("error during mapping check, failing recovery", mappingCheckException.get()); + throw new ElasticsearchException("error during mapping check", mappingCheckException.get()); + } if (documentMappersToUpdate.isEmpty()) { return; } diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java index 63012cfb4e7..a4036c41402 100644 --- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java +++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java @@ -34,8 +34,10 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -48,10 +50,11 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoveryService; +import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -442,6 +445,92 @@ public class RelocationTests extends ElasticsearchIntegrationTest { assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty()); } + @Test + @TestLogging("cluster.service:TRACE,indices.recovery:TRACE") + public void testRelocationWithBusyClusterUpdateThread() throws Exception { + final String indexName = "test"; + final Settings settings = ImmutableSettings.builder() + .put("gateway.type", "local") + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") + .put("indices.recovery.internal_action_timeout", "1s").build(); + String master = internalCluster().startNode(settings); + ensureGreen(); + List nodes = internalCluster().startNodesAsync(2, settings).get(); + final String node1 = nodes.get(0); + final String node2 = nodes.get(1); + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + + + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); + + + client().admin().indices().prepareCreate(indexName) + .setSettings( + ImmutableSettings.builder() + .put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + + + List requests = new ArrayList<>(); + int numDocs = scaledRandomIntBetween(25, 250); + for (int i = 0; i < numDocs; i++) { + requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}")); + } + indexRandom(true, requests); + ensureSearchable(indexName); + + // capture the incoming state indicate that the replicas have upgraded and assigned + + final CountDownLatch allReplicasAssigned = new CountDownLatch(1); + final CountDownLatch releaseClusterState = new CountDownLatch(1); + try { + clusterService.addLast(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().routingNodes().hasUnassignedShards() == false) { + allReplicasAssigned.countDown(); + try { + releaseClusterState.await(); + } catch (InterruptedException e) { + // + } + } + } + }); + + logger.info("--> starting replica recovery"); + // we don't expect this to be acknowledge by node1 where we block the cluster state thread + assertFalse(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(ImmutableSettings.builder() + .put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1 + "," + node2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + + ).setTimeout("200ms") + .get().isAcknowledged()); + + logger.info("--> waiting for node1 to process replica existence"); + allReplicasAssigned.await(); + logger.info("--> waiting for recovery to fail"); + assertBusy(new Runnable() { + @Override + public void run() { + ClusterHealthResponse response = client().admin().cluster().prepareHealth().get(); + assertThat(response.getUnassignedShards(), equalTo(1)); + } + }); + } finally { + logger.info("--> releasing cluster state update thread"); + releaseClusterState.countDown(); + } + logger.info("--> waiting for recovery to succeed"); + // force a move. + client().admin().cluster().prepareReroute().get(); + ensureGreen(); + } + @Test @Slow public void testCancellationCleansTempFiles() throws Exception { @@ -453,13 +542,6 @@ public class RelocationTests extends ElasticsearchIntegrationTest { .setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); internalCluster().startNodesAsync(2).get(); - - List requests = new ArrayList<>(); - int numDocs = scaledRandomIntBetween(25, 250); - for (int i = 0; i < numDocs; i++) { - requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}")); - } - indexRandom(true, requests); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut()); flush(); @@ -548,4 +630,3 @@ public class RelocationTests extends ElasticsearchIntegrationTest { } } } -