Recovery: add a timeout to local mapping change check

After phase1 of recovery is completed, we check that all pending mapping changes have been sent to the master and processed by the other nodes. This is needed in order to make sure that the target node has the latest mapping (we just copied over the corresponding lucene files). To make sure we do not miss updates, we do so under a local cluster state update task. At the moment we don't have a timeout when waiting on the task to be completed. If the local node update thread is very busy, this may stall the recovery for too long. This commit adds a timeout (equal to `indices.recovery.internal_action_timeout`) and upgrade the task urgency to `IMMEDIATE`. If we fail to perform the check, we fail the recovery.

Closes #9575
This commit is contained in:
Boaz Leskes 2015-02-05 09:09:37 +01:00
parent c6968883a7
commit 23022227d4
3 changed files with 113 additions and 15 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
/**
@ -36,7 +35,7 @@ abstract public class ClusterStateUpdateTask {
/**
* A callback called when execute fails.
*/
abstract public void onFailure(String source, @Nullable Throwable t);
abstract public void onFailure(String source, Throwable t);
/**

View File

@ -28,11 +28,12 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -480,7 +481,20 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() {
final AtomicReference<Throwable> mappingCheckException = new AtomicReference<>();
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public TimeValue timeout() {
return recoverySettings.internalActionTimeout();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
@ -507,8 +521,8 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
});
@ -518,6 +532,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
latch.await();
}
});
if (mappingCheckException.get() != null) {
logger.warn("error during mapping check, failing recovery", mappingCheckException.get());
throw new ElasticsearchException("error during mapping check", mappingCheckException.get());
}
if (documentMappersToUpdate.isEmpty()) {
return;
}

View File

@ -34,8 +34,10 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -48,10 +50,11 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoverySettings;
@ -442,6 +445,92 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
}
@Test
@TestLogging("cluster.service:TRACE,indices.recovery:TRACE")
public void testRelocationWithBusyClusterUpdateThread() throws Exception {
final String indexName = "test";
final Settings settings = ImmutableSettings.builder()
.put("gateway.type", "local")
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s")
.put("indices.recovery.internal_action_timeout", "1s").build();
String master = internalCluster().startNode(settings);
ensureGreen();
List<String> nodes = internalCluster().startNodesAsync(2, settings).get();
final String node1 = nodes.get(0);
final String node2 = nodes.get(1);
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
client().admin().indices().prepareCreate(indexName)
.setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
ensureSearchable(indexName);
// capture the incoming state indicate that the replicas have upgraded and assigned
final CountDownLatch allReplicasAssigned = new CountDownLatch(1);
final CountDownLatch releaseClusterState = new CountDownLatch(1);
try {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().routingNodes().hasUnassignedShards() == false) {
allReplicasAssigned.countDown();
try {
releaseClusterState.await();
} catch (InterruptedException e) {
//
}
}
}
});
logger.info("--> starting replica recovery");
// we don't expect this to be acknowledge by node1 where we block the cluster state thread
assertFalse(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1 + "," + node2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
).setTimeout("200ms")
.get().isAcknowledged());
logger.info("--> waiting for node1 to process replica existence");
allReplicasAssigned.await();
logger.info("--> waiting for recovery to fail");
assertBusy(new Runnable() {
@Override
public void run() {
ClusterHealthResponse response = client().admin().cluster().prepareHealth().get();
assertThat(response.getUnassignedShards(), equalTo(1));
}
});
} finally {
logger.info("--> releasing cluster state update thread");
releaseClusterState.countDown();
}
logger.info("--> waiting for recovery to succeed");
// force a move.
client().admin().cluster().prepareReroute().get();
ensureGreen();
}
@Test
@Slow
public void testCancellationCleansTempFiles() throws Exception {
@ -453,13 +542,6 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
.setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get();
internalCluster().startNodesAsync(2).get();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut());
flush();
@ -548,4 +630,3 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
}
}
}