diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 921b6d149ee..e1a17c202a8 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; /** @@ -36,7 +35,7 @@ abstract public class ClusterStateUpdateTask { /** * A callback called when execute fails. */ - abstract public void onFailure(String source, @Nullable Throwable t); + abstract public void onFailure(String source, Throwable t); /** diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index 8ad44eaab48..01c049c7c80 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -28,11 +28,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -480,7 +481,20 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { // while we're checking final BlockingQueue documentMappersToUpdate = ConcurrentCollections.newBlockingQueue(); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() { + final AtomicReference mappingCheckException = new AtomicReference<>(); + // we use immediate as this is a very light weight check and we don't wait to delay recovery + clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() { + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public TimeValue timeout() { + return recoverySettings.internalActionTimeout(); + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { latch.countDown(); @@ -507,8 +521,8 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { } @Override - public void onFailure(String source, @Nullable Throwable t) { - logger.error("unexpected error while checking for pending mapping changes", t); + public void onFailure(String source, Throwable t) { + mappingCheckException.set(t); latch.countDown(); } }); @@ -518,6 +532,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { latch.await(); } }); + if (mappingCheckException.get() != null) { + logger.warn("error during mapping check, failing recovery", mappingCheckException.get()); + throw new ElasticsearchException("error during mapping check", mappingCheckException.get()); + } if (documentMappersToUpdate.isEmpty()) { return; } diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java index 63012cfb4e7..a4036c41402 100644 --- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java +++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java @@ -34,8 +34,10 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -48,10 +50,11 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoveryService; +import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -442,6 +445,92 @@ public class RelocationTests extends ElasticsearchIntegrationTest { assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty()); } + @Test + @TestLogging("cluster.service:TRACE,indices.recovery:TRACE") + public void testRelocationWithBusyClusterUpdateThread() throws Exception { + final String indexName = "test"; + final Settings settings = ImmutableSettings.builder() + .put("gateway.type", "local") + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") + .put("indices.recovery.internal_action_timeout", "1s").build(); + String master = internalCluster().startNode(settings); + ensureGreen(); + List nodes = internalCluster().startNodesAsync(2, settings).get(); + final String node1 = nodes.get(0); + final String node2 = nodes.get(1); + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + + + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); + + + client().admin().indices().prepareCreate(indexName) + .setSettings( + ImmutableSettings.builder() + .put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + + + List requests = new ArrayList<>(); + int numDocs = scaledRandomIntBetween(25, 250); + for (int i = 0; i < numDocs; i++) { + requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}")); + } + indexRandom(true, requests); + ensureSearchable(indexName); + + // capture the incoming state indicate that the replicas have upgraded and assigned + + final CountDownLatch allReplicasAssigned = new CountDownLatch(1); + final CountDownLatch releaseClusterState = new CountDownLatch(1); + try { + clusterService.addLast(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().routingNodes().hasUnassignedShards() == false) { + allReplicasAssigned.countDown(); + try { + releaseClusterState.await(); + } catch (InterruptedException e) { + // + } + } + } + }); + + logger.info("--> starting replica recovery"); + // we don't expect this to be acknowledge by node1 where we block the cluster state thread + assertFalse(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(ImmutableSettings.builder() + .put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1 + "," + node2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + + ).setTimeout("200ms") + .get().isAcknowledged()); + + logger.info("--> waiting for node1 to process replica existence"); + allReplicasAssigned.await(); + logger.info("--> waiting for recovery to fail"); + assertBusy(new Runnable() { + @Override + public void run() { + ClusterHealthResponse response = client().admin().cluster().prepareHealth().get(); + assertThat(response.getUnassignedShards(), equalTo(1)); + } + }); + } finally { + logger.info("--> releasing cluster state update thread"); + releaseClusterState.countDown(); + } + logger.info("--> waiting for recovery to succeed"); + // force a move. + client().admin().cluster().prepareReroute().get(); + ensureGreen(); + } + @Test @Slow public void testCancellationCleansTempFiles() throws Exception { @@ -453,13 +542,6 @@ public class RelocationTests extends ElasticsearchIntegrationTest { .setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); internalCluster().startNodesAsync(2).get(); - - List requests = new ArrayList<>(); - int numDocs = scaledRandomIntBetween(25, 250); - for (int i = 0; i < numDocs; i++) { - requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}")); - } - indexRandom(true, requests); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut()); flush(); @@ -548,4 +630,3 @@ public class RelocationTests extends ElasticsearchIntegrationTest { } } } -