Recovery API should also report ongoing relocation recoveries

We currently only report relocation related recoveries after they are done.

Closes #6585
This commit is contained in:
Boaz Leskes 2014-06-21 19:23:19 +02:00
parent 155620ed8e
commit ca194594b3
2 changed files with 153 additions and 40 deletions

View File

@ -35,21 +35,21 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -174,7 +174,7 @@ public class TransportRecoveryAction extends
@Override
protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.routingTable().allAssignedShardsGrouped(concreteIndices, true);
return state.routingTable().allAssignedShardsGrouped(concreteIndices, true, true);
}
@Override

View File

@ -29,6 +29,8 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.indices.recovery.RecoveryState.Type;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -40,7 +42,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
@ -60,6 +62,59 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 0;
private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, Type type,
String sourceNode, String targetNode, boolean hasRestoreSource) {
assertThat(state.getShardId().getId(), equalTo(shardId));
assertThat(state.getType(), equalTo(type));
if (sourceNode == null) {
assertNull(state.getSourceNode());
} else {
assertNotNull(state.getSourceNode());
assertThat(state.getSourceNode().getName(), equalTo(sourceNode));
}
if (targetNode == null) {
assertNull(state.getTargetNode());
} else {
assertNotNull(state.getTargetNode());
assertThat(state.getTargetNode().getName(), equalTo(targetNode));
}
if (hasRestoreSource) {
assertNotNull(state.getRestoreSource());
} else {
assertNull(state.getRestoreSource());
}
}
private void assertRecoveryState(RecoveryState state, int shardId, Type type, Stage stage,
String sourceNode, String targetNode, boolean hasRestoreSource) {
assertRecoveryStateWithoutStage(state, shardId, type, sourceNode, targetNode, hasRestoreSource);
assertThat(state.getStage(), equalTo(stage));
}
private void assertOnGoingRecoveryState(RecoveryState state, int shardId, Type type,
String sourceNode, String targetNode, boolean hasRestoreSource) {
assertRecoveryStateWithoutStage(state, shardId, type, sourceNode, targetNode, hasRestoreSource);
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
}
private void slowDownRecovery() {
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "50b")
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "10b"))
.get().isAcknowledged());
}
private void restoreRecoverySpeed() {
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb")
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb"))
.get().isAcknowledged());
}
@Test
public void gatewayRecoveryTest() throws Exception {
logger.info("--> start nodes");
@ -82,11 +137,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
ShardRecoveryResponse shardResponse = shardResponses.get(0);
RecoveryState state = shardResponse.recoveryState();
assertThat(state.getType(), equalTo(RecoveryState.Type.GATEWAY));
assertThat(state.getStage(), equalTo(RecoveryState.Stage.DONE));
assertThat(node, equalTo(state.getSourceNode().getName()));
assertThat(node, equalTo(state.getTargetNode().getName()));
assertNull(state.getRestoreSource());
assertRecoveryState(state, 0, Type.GATEWAY, Stage.DONE, node, node, false);
validateIndexRecoveryState(state.getIndex());
}
@ -141,20 +192,12 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
// validate node A recovery
ShardRecoveryResponse nodeAShardResponse = nodeAResponses.get(0);
assertThat(nodeAShardResponse.recoveryState().getShardId().id(), equalTo(0));
assertThat(nodeAShardResponse.recoveryState().getSourceNode().getName(), equalTo(nodeA));
assertThat(nodeAShardResponse.recoveryState().getTargetNode().getName(), equalTo(nodeA));
assertThat(nodeAShardResponse.recoveryState().getType(), equalTo(RecoveryState.Type.GATEWAY));
assertThat(nodeAShardResponse.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
assertRecoveryState(nodeAShardResponse.recoveryState(), 0, Type.GATEWAY, Stage.DONE, nodeA, nodeA, false);
validateIndexRecoveryState(nodeAShardResponse.recoveryState().getIndex());
// validate node B recovery
ShardRecoveryResponse nodeBShardResponse = nodeBResponses.get(0);
assertThat(nodeBShardResponse.recoveryState().getShardId().id(), equalTo(0));
assertThat(nodeBShardResponse.recoveryState().getSourceNode().getName(), equalTo(nodeA));
assertThat(nodeBShardResponse.recoveryState().getTargetNode().getName(), equalTo(nodeB));
assertThat(nodeBShardResponse.recoveryState().getType(), equalTo(RecoveryState.Type.REPLICA));
assertThat(nodeBShardResponse.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
assertRecoveryState(nodeBShardResponse.recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBShardResponse.recoveryState().getIndex());
}
@ -168,30 +211,104 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
logger.info("--> start node B");
String nodeB = internalCluster().startNode(settingsBuilder().put("gateway.type", "local"));
ensureGreen();
logger.info("--> slowing down recoveries");
slowDownRecovery();
logger.info("--> move shard from: {} to: {}", nodeA, nodeB);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId(INDEX_NAME, 0), nodeA, nodeB))
.execute().actionGet().getState();
ensureGreen();
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
List<ShardRecoveryResponse> shardResponses = response.shardResponses().get(INDEX_NAME);
List<ShardRecoveryResponse> nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses);
assertThat(nodeAResponses.size(), equalTo(1));
List<ShardRecoveryResponse> nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses);
assertThat(nodeBResponses.size(), equalTo(1));
assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.GATEWAY, Stage.DONE, nodeA, nodeA, false);
validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex());
assertOnGoingRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex());
logger.info("--> speeding up recoveries");
restoreRecoverySpeed();
// wait for it to be finished
ensureGreen();
response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
shardResponses = response.shardResponses().get(INDEX_NAME);
assertThat(shardResponses.size(), equalTo(1));
ShardRecoveryResponse shardResponse = shardResponses.get(0);
RecoveryState state = shardResponse.recoveryState();
assertRecoveryState(shardResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(shardResponses.get(0).recoveryState().getIndex());
assertThat(state.getType(), equalTo(RecoveryState.Type.RELOCATION));
assertThat(state.getStage(), equalTo(RecoveryState.Stage.DONE));
assertThat(nodeA, equalTo(state.getSourceNode().getName()));
assertThat(nodeB, equalTo(state.getTargetNode().getName()));
assertNull(state.getRestoreSource());
validateIndexRecoveryState(state.getIndex());
logger.info("--> bump replica count");
client().admin().indices().prepareUpdateSettings(INDEX_NAME)
.setSettings(settingsBuilder().put("number_of_replicas", 1)).execute().actionGet();
ensureGreen();
logger.info("--> start node C");
String nodeC = internalCluster().startNode(settingsBuilder().put("gateway.type", "local"));
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());
logger.info("--> slowing down recoveries");
slowDownRecovery();
logger.info("--> move replica shard from: {} to: {}", nodeA, nodeC);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId(INDEX_NAME, 0), nodeA, nodeC))
.execute().actionGet().getState();
response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
shardResponses = response.shardResponses().get(INDEX_NAME);
nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses);
assertThat(nodeAResponses.size(), equalTo(1));
nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses);
assertThat(nodeBResponses.size(), equalTo(1));
List<ShardRecoveryResponse> nodeCResponses = findRecoveriesForTargetNode(nodeC, shardResponses);
assertThat(nodeCResponses.size(), equalTo(1));
assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false);
validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex());
assertRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex());
// relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B)
assertOnGoingRecoveryState(nodeCResponses.get(0).recoveryState(), 0, Type.REPLICA, nodeB, nodeC, false);
validateIndexRecoveryState(nodeCResponses.get(0).recoveryState().getIndex());
logger.info("--> speeding up recoveries");
restoreRecoverySpeed();
ensureGreen();
response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
shardResponses = response.shardResponses().get(INDEX_NAME);
nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses);
assertThat(nodeAResponses.size(), equalTo(0));
nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses);
assertThat(nodeBResponses.size(), equalTo(1));
nodeCResponses = findRecoveriesForTargetNode(nodeC, shardResponses);
assertThat(nodeCResponses.size(), equalTo(1));
assertRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex());
// relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B)
assertRecoveryState(nodeCResponses.get(0).recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeB, nodeC, false);
validateIndexRecoveryState(nodeCResponses.get(0).recoveryState().getIndex());
}
@Test
@ -240,11 +357,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
assertThat(shardRecoveryResponses.size(), equalTo(totalShards));
for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) {
assertThat(shardResponse.recoveryState().getType(), equalTo(RecoveryState.Type.SNAPSHOT));
assertThat(shardResponse.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
assertNotNull(shardResponse.recoveryState().getRestoreSource());
assertThat(shardResponse.recoveryState().getTargetNode().getName(), equalTo(nodeA));
assertRecoveryState(shardResponse.recoveryState(), 0, Type.SNAPSHOT, Stage.DONE, null, nodeA, true);
validateIndexRecoveryState(shardResponse.recoveryState().getIndex());
}
}
@ -275,8 +388,8 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex(INDEX_NAME, INDEX_TYPE).
setSource("foo-int-" + i, randomInt(),
"foo-string-" + i, randomAsciiOfLength(32),
"foo-float-" + i, randomFloat());
"foo-string-" + i, randomAsciiOfLength(32),
"foo-float-" + i, randomFloat());
}
indexRandom(true, docs);