Ignore shard started requests when primary term does not match (#37899)

This commit changes the StartedShardEntry so that it also contains the 
primary term of the shard to start. This way the master node can also 
checks that the primary term from the start request is equal to the current 
shard's primary term in the cluster state, and it can ignore any shard 
started request that would concerns a previous instance of the shard that 
would have been allocated to the same node.

Such situation are likely to happen with frozen (or restored) indices and 
the replication of closed indices, because with replicated closed indices 
the shards will be initialized again after the index is closed and can 
potentially be re initialized again if the index is reopened as a frozen 
index. In such cases the lifecycle of the shards would be something like:
* shard is STARTED
* index is closed
* shards is INITIALIZING (index state is CLOSED, primary term is X)
* index is reopened
* shards are INITIALIZING again (index state is OPENED, potentially frozen, 
primary term is X+1)

Adding the primary term to the shard started request will allow to discard 
potential StartedShardEntry requests received by the master node if the 
request concerns the shard with primary term X because it has been 
moved/reinitialized in the meanwhile under the primary term X+1.

Relates to #33888
This commit is contained in:
Tanguy Leroux 2019-01-29 15:09:40 +01:00 committed by GitHub
parent 7f1784e9f9
commit 5d1964bcbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 246 additions and 80 deletions

View File

@ -494,12 +494,20 @@ public class ShardStateAction {
} }
} }
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { public void shardStarted(final ShardRouting shardRouting,
shardStarted(shardRouting, message, listener, clusterService.state()); final long primaryTerm,
final String message,
final Listener listener) {
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
} }
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) {
StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message); public void shardStarted(final ShardRouting shardRouting,
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener); final long primaryTerm,
final String message,
final Listener listener,
final ClusterState currentState) {
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
} }
private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> { private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
@ -544,7 +552,7 @@ public class ShardStateAction {
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
for (StartedShardEntry task : tasks) { for (StartedShardEntry task : tasks) {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) { if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale // events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
@ -553,6 +561,19 @@ public class ShardStateAction {
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task); logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
builder.success(task); builder.success(task);
} else { } else {
if (matched.primary() && task.primaryTerm > 0) {
final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
assert indexMetaData != null;
final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " +
"current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})",
task.shardId, task, task.primaryTerm, currentPrimaryTerm);
builder.success(task);
continue;
}
}
if (matched.initializing() == false) { if (matched.initializing() == false) {
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched; assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
// same as above, this might have been a stale in-flight request, so we just ignore. // same as above, this might have been a stale in-flight request, so we just ignore.
@ -597,6 +618,7 @@ public class ShardStateAction {
public static class StartedShardEntry extends TransportRequest { public static class StartedShardEntry extends TransportRequest {
final ShardId shardId; final ShardId shardId;
final String allocationId; final String allocationId;
final long primaryTerm;
final String message; final String message;
StartedShardEntry(StreamInput in) throws IOException { StartedShardEntry(StreamInput in) throws IOException {
@ -604,8 +626,12 @@ public class ShardStateAction {
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
allocationId = in.readString(); allocationId = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) { if (in.getVersion().before(Version.V_6_3_0)) {
final long primaryTerm = in.readVLong(); primaryTerm = in.readVLong();
assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]"; assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]";
} else if (in.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
primaryTerm = in.readVLong();
} else {
primaryTerm = UNASSIGNED_PRIMARY_TERM;
} }
this.message = in.readString(); this.message = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) { if (in.getVersion().before(Version.V_6_3_0)) {
@ -614,9 +640,10 @@ public class ShardStateAction {
} }
} }
public StartedShardEntry(ShardId shardId, String allocationId, String message) { public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) {
this.shardId = shardId; this.shardId = shardId;
this.allocationId = allocationId; this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message; this.message = message;
} }
@ -627,6 +654,8 @@ public class ShardStateAction {
out.writeString(allocationId); out.writeString(allocationId);
if (out.getVersion().before(Version.V_6_3_0)) { if (out.getVersion().before(Version.V_6_3_0)) {
out.writeVLong(0L); out.writeVLong(0L);
} else if (out.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
out.writeVLong(primaryTerm);
} }
out.writeString(message); out.writeString(message);
if (out.getVersion().before(Version.V_6_3_0)) { if (out.getVersion().before(Version.V_6_3_0)) {
@ -636,8 +665,8 @@ public class ShardStateAction {
@Override @Override
public String toString() { public String toString() {
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}", return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}",
shardId, allocationId, message); shardId, allocationId, primaryTerm, message);
} }
} }

View File

@ -575,13 +575,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
} }
try { try {
logger.debug("{} creating shard", shardRouting.shardId()); final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard( indicesService.createShard(
shardRouting, shardRouting,
recoveryState, recoveryState,
recoveryTargetService, recoveryTargetService,
new RecoveryListener(shardRouting), new RecoveryListener(shardRouting, primaryTerm),
repositoriesService, repositoriesService,
failedShardHandler, failedShardHandler,
globalCheckpointSyncer, globalCheckpointSyncer,
@ -598,9 +599,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
"local shard has a different allocation id but wasn't cleaning by removeShards. " "local shard has a different allocation id but wasn't cleaning by removeShards. "
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry; + "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
final long primaryTerm;
try { try {
final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex()); final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex());
final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id()); primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id()); final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id());
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards() final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards()
@ -633,7 +635,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
shardRouting.shardId(), state, nodes.getMasterNode()); shardRouting.shardId(), state, nodes.getMasterNode());
} }
if (nodes.getMasterNode() != null) { if (nodes.getMasterNode() != null) {
shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() + shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() +
" marked shard as initializing, but shard state is [" + state + "], mark shard as started", " marked shard as initializing, but shard state is [" + state + "], mark shard as started",
SHARD_STATE_ACTION_LISTENER, clusterState); SHARD_STATE_ACTION_LISTENER, clusterState);
} }
@ -673,15 +675,24 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener { private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting; private final ShardRouting shardRouting;
private RecoveryListener(ShardRouting shardRouting) { /**
* Primary term with which the shard was created
*/
private final long primaryTerm;
private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting; this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
} }
@Override @Override
public void onRecoveryDone(RecoveryState state) { public void onRecoveryDone(final RecoveryState state) {
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -34,7 +35,6 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -64,19 +64,19 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
public void testEmptyTaskListProducesSameClusterState() throws Exception { public void testEmptyTaskListProducesSameClusterState() throws Exception {
final ClusterState clusterState = stateWithNoShard(); final ClusterState clusterState = stateWithNoShard();
assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, Collections.emptyList());
assertSame(clusterState, result.resultingState);
} }
public void testNonExistentIndexMarkedAsSuccessful() throws Exception { public void testNonExistentIndexMarkedAsSuccessful() throws Exception {
final ClusterState clusterState = stateWithNoShard(); final ClusterState clusterState = stateWithNoShard();
final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test"); final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", randomNonNegativeLong(), "test");
assertTasksExecution(clusterState, singletonList(entry),
result -> { final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(entry));
assertSame(clusterState, result.resultingState); assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1)); assertThat(result.executionResults.size(), equalTo(1));
assertThat(result.executionResults.containsKey(entry), is(true)); assertThat(result.executionResults.containsKey(entry), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true));
});
} }
public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
@ -87,21 +87,20 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
final List<StartedShardEntry> tasks = Stream.concat( final List<StartedShardEntry> tasks = Stream.concat(
// Existent shard id but different allocation id // Existent shard id but different allocation id
IntStream.range(0, randomIntBetween(1, 5)) IntStream.range(0, randomIntBetween(1, 5))
.mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")), .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), 0L, "allocation id")),
// Non existent shard id // Non existent shard id
IntStream.range(1, randomIntBetween(2, 5)) IntStream.range(1, randomIntBetween(2, 5))
.mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id")) .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), 0L, "shard id"))
).collect(Collectors.toList()); ).collect(Collectors.toList());
assertTasksExecution(clusterState, tasks, result -> { final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks);
assertSame(clusterState, result.resultingState); assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size())); assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> { tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true)); assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
}); });
});
} }
public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {
@ -119,17 +118,17 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
} else { } else {
allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId();
} }
return new StartedShardEntry(shardId, allocationId, "test"); final long primaryTerm = indexMetaData.primaryTerm(shardId.id());
return new StartedShardEntry(shardId, allocationId, primaryTerm, "test");
}).collect(Collectors.toList()); }).collect(Collectors.toList());
assertTasksExecution(clusterState, tasks, result -> { final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks);
assertSame(clusterState, result.resultingState); assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size())); assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> { tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true)); assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
}); });
});
} }
public void testStartedShards() throws Exception { public void testStartedShards() throws Exception {
@ -138,17 +137,18 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
final long primaryTerm = indexMetaData.primaryTerm(shardId.id());
final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
final String primaryAllocationId = primaryShard.allocationId().getId(); final String primaryAllocationId = primaryShard.allocationId().getId();
final List<StartedShardEntry> tasks = new ArrayList<>(); final List<StartedShardEntry> tasks = new ArrayList<>();
tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test")); tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test"));
if (randomBoolean()) { if (randomBoolean()) {
final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next();
final String replicaAllocationId = replicaShard.allocationId().getId(); final String replicaAllocationId = replicaShard.allocationId().getId();
tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test")); tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test"));
} }
assertTasksExecution(clusterState, tasks, result -> { final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks);
assertNotSame(clusterState, result.resultingState); assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size())); assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> { tasks.forEach(task -> {
@ -158,7 +158,6 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
}); });
});
} }
public void testDuplicateStartsAreOkay() throws Exception { public void testDuplicateStartsAreOkay() throws Exception {
@ -169,12 +168,13 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
final String allocationId = shardRouting.allocationId().getId(); final String allocationId = shardRouting.allocationId().getId();
final long primaryTerm = indexMetaData.primaryTerm(shardId.id());
final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(2, 10)) final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(2, 10))
.mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test")) .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test"))
.collect(Collectors.toList()); .collect(Collectors.toList());
assertTasksExecution(clusterState, tasks, result -> { final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks);
assertNotSame(clusterState, result.resultingState); assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size())); assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> { tasks.forEach(task -> {
@ -184,14 +184,72 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
}); });
});
} }
private void assertTasksExecution(final ClusterState state, public void testPrimaryTermsMismatch() throws Exception {
final List<StartedShardEntry> tasks, final String indexName = "test";
final Consumer<ClusterStateTaskExecutor.ClusterTasksResult> consumer) throws Exception { final int shard = 0;
final int primaryTerm = 2 + randomInt(200);
ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING);
clusterState = ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
.put(IndexMetaData.builder(clusterState.metaData().index(indexName))
.primaryTerm(shard, primaryTerm)
.build(), true)
.build())
.build();
final ShardId shardId = new ShardId(clusterState.metaData().index(indexName).getIndex(), shard);
final String primaryAllocationId = clusterState.routingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId();
{
final StartedShardEntry task =
new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary");
final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task));
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1));
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.INITIALIZING));
assertSame(clusterState, result.resultingState);
}
{
final StartedShardEntry task =
new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary");
final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task));
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1));
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
assertNotSame(clusterState, result.resultingState);
clusterState = result.resultingState;
}
{
final long replicaPrimaryTerm = randomBoolean() ? primaryTerm : primaryTerm - 1;
final String replicaAllocationId = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next()
.allocationId().getId();
final StartedShardEntry task = new StartedShardEntry(shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica");
final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task));
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1));
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
assertNotSame(clusterState, result.resultingState);
}
}
private ClusterStateTaskExecutor.ClusterTasksResult executeTasks(final ClusterState state,
final List<StartedShardEntry> tasks) throws Exception {
final ClusterStateTaskExecutor.ClusterTasksResult<StartedShardEntry> result = executor.execute(state, tasks); final ClusterStateTaskExecutor.ClusterTasksResult<StartedShardEntry> result = executor.execute(state, tasks);
assertThat(result, notNullValue()); assertThat(result, notNullValue());
consumer.accept(result); return result;
} }
} }

View File

@ -72,12 +72,14 @@ import java.util.function.Predicate;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class ShardStateActionTests extends ESTestCase { public class ShardStateActionTests extends ESTestCase {
@ -420,8 +422,9 @@ public class ShardStateActionTests extends ESTestCase {
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
final ShardRouting shardRouting = getRandomShardRouting(index); final ShardRouting shardRouting = getRandomShardRouting(index);
final long primaryTerm = clusterService.state().metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
final TestListener listener = new TestListener(); final TestListener listener = new TestListener();
shardStateAction.shardStarted(shardRouting, "testShardStarted", listener); shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", listener);
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class));
@ -429,6 +432,7 @@ public class ShardStateActionTests extends ESTestCase {
ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request; ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request;
assertThat(entry.shardId, equalTo(shardRouting.shardId())); assertThat(entry.shardId, equalTo(shardRouting.shardId()));
assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId()));
assertThat(entry.primaryTerm, equalTo(primaryTerm));
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
listener.await(); listener.await();
@ -481,7 +485,7 @@ public class ShardStateActionTests extends ESTestCase {
final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000));
final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, reason), bwcVersion).streamInput()) { try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, 0L, reason), bwcVersion).streamInput()) {
in.setVersion(bwcVersion); in.setVersion(bwcVersion);
final FailedShardEntry failedShardEntry = new FailedShardEntry(in); final FailedShardEntry failedShardEntry = new FailedShardEntry(in);
assertThat(failedShardEntry.shardId, equalTo(shardId)); assertThat(failedShardEntry.shardId, equalTo(shardId));
@ -490,8 +494,7 @@ public class ShardStateActionTests extends ESTestCase {
assertThat(failedShardEntry.failure, nullValue()); assertThat(failedShardEntry.failure, nullValue());
assertThat(failedShardEntry.markAsStale, equalTo(true)); assertThat(failedShardEntry.markAsStale, equalTo(true));
} }
try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, reason, null, false), bwcVersion).streamInput()) {
reason, null, false), bwcVersion).streamInput()) {
in.setVersion(bwcVersion); in.setVersion(bwcVersion);
final StartedShardEntry startedShardEntry = new StartedShardEntry(in); final StartedShardEntry startedShardEntry = new StartedShardEntry(in);
assertThat(startedShardEntry.shardId, equalTo(shardId)); assertThat(startedShardEntry.shardId, equalTo(shardId));
@ -500,6 +503,56 @@ public class ShardStateActionTests extends ESTestCase {
} }
} }
public void testFailedShardEntrySerialization() throws Exception {
final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000));
final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
final long primaryTerm = randomIntBetween(0, 100);
final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
final Exception failure = randomBoolean() ? null : getSimulatedFailure();
final boolean markAsStale = randomBoolean();
final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT));
final FailedShardEntry failedShardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale);
try (StreamInput in = serialize(failedShardEntry, version).streamInput()) {
in.setVersion(version);
final FailedShardEntry deserialized = new FailedShardEntry(in);
assertThat(deserialized.shardId, equalTo(shardId));
assertThat(deserialized.allocationId, equalTo(allocationId));
assertThat(deserialized.primaryTerm, equalTo(primaryTerm));
assertThat(deserialized.message, equalTo(message));
if (failure != null) {
assertThat(deserialized.failure, notNullValue());
assertThat(deserialized.failure.getClass(), equalTo(failure.getClass()));
assertThat(deserialized.failure.getMessage(), equalTo(failure.getMessage()));
} else {
assertThat(deserialized.failure, nullValue());
}
assertThat(deserialized.markAsStale, equalTo(markAsStale));
assertEquals(failedShardEntry, deserialized);
}
}
public void testStartedShardEntrySerialization() throws Exception {
final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000));
final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
final long primaryTerm = randomIntBetween(0, 100);
final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100);
final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT));
try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, primaryTerm, message), version).streamInput()) {
in.setVersion(version);
final StartedShardEntry deserialized = new StartedShardEntry(in);
assertThat(deserialized.shardId, equalTo(shardId));
assertThat(deserialized.allocationId, equalTo(allocationId));
if (version.onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
assertThat(deserialized.primaryTerm, equalTo(primaryTerm));
} else {
assertThat(deserialized.primaryTerm, equalTo(0L));
}
assertThat(deserialized.message, equalTo(message));
}
}
BytesReference serialize(Writeable writeable, Version version) throws IOException { BytesReference serialize(Writeable writeable, Version version) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) { try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version); out.setVersion(version);

View File

@ -361,11 +361,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
assertThat(this.shardId(), equalTo(shardRouting.shardId())); assertThat(this.shardId(), equalTo(shardRouting.shardId()));
assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting)); assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting));
if (this.shardRouting.active()) { if (this.shardRouting.active()) {
assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, assertTrue("an active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting,
shardRouting.active()); shardRouting.active());
} }
if (this.shardRouting.primary()) { if (this.shardRouting.primary()) {
assertTrue("a primary shard can't be demoted", shardRouting.primary()); assertTrue("a primary shard can't be demoted", shardRouting.primary());
if (this.shardRouting.initializing()) {
assertEquals("primary term can not be updated on an initializing primary shard: " + shardRouting, term, newPrimaryTerm);
}
} else if (shardRouting.primary()) { } else if (shardRouting.primary()) {
// note: it's ok for a replica in post recovery to be started and promoted at once // note: it's ok for a replica in post recovery to be started and promoted at once
// this can happen when the primary failed after we sent the start shard message // this can happen when the primary failed after we sent the start shard message
@ -390,6 +393,10 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
return null; return null;
} }
public long term() {
return term;
}
public void updateTerm(long newTerm) { public void updateTerm(long newTerm) {
assertThat("term can only be incremented: " + shardRouting, newTerm, greaterThanOrEqualTo(term)); assertThat("term can only be incremented: " + shardRouting, newTerm, greaterThanOrEqualTo(term));
if (shardRouting.primary() && shardRouting.active()) { if (shardRouting.primary() && shardRouting.active()) {

View File

@ -277,10 +277,18 @@ public class ClusterStateChanges {
} }
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) { public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
List<StartedShardEntry> entries = startedShards.stream().map(startedShard -> final Map<ShardRouting, Long> entries = startedShards.stream()
new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), "shard started")) .collect(Collectors.toMap(Function.identity(), startedShard -> {
.collect(Collectors.toList()); final IndexMetaData indexMetaData = clusterState.metaData().index(startedShard.shardId().getIndex());
return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries); return indexMetaData != null ? indexMetaData.primaryTerm(startedShard.shardId().id()) : 0L;
}));
return applyStartedShards(clusterState, entries);
}
public ClusterState applyStartedShards(ClusterState clusterState, Map<ShardRouting, Long> startedShards) {
return runTasks(shardStartedClusterStateTaskExecutor, clusterState, startedShards.entrySet().stream()
.map(e -> new StartedShardEntry(e.getKey().shardId(), e.getKey().allocationId().getId(), e.getValue(), "shard started"))
.collect(Collectors.toList()));
} }
private <T> ClusterState runTasks(ClusterStateTaskExecutor<T> executor, ClusterState clusterState, List<T> entries) { private <T> ClusterState runTasks(ClusterStateTaskExecutor<T> executor, ClusterState clusterState, List<T> entries) {

View File

@ -384,7 +384,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
} }
// randomly start and fail allocated shards // randomly start and fail allocated shards
List<ShardRouting> startedShards = new ArrayList<>(); final Map<ShardRouting, Long> startedShards = new HashMap<>();
List<FailedShard> failedShards = new ArrayList<>(); List<FailedShard> failedShards = new ArrayList<>();
for (DiscoveryNode node : state.nodes()) { for (DiscoveryNode node : state.nodes()) {
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node);
@ -393,7 +393,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
for (MockIndexShard indexShard : indexService) { for (MockIndexShard indexShard : indexService) {
ShardRouting persistedShardRouting = indexShard.routingEntry(); ShardRouting persistedShardRouting = indexShard.routingEntry();
if (persistedShardRouting.initializing() && randomBoolean()) { if (persistedShardRouting.initializing() && randomBoolean()) {
startedShards.add(persistedShardRouting); startedShards.put(persistedShardRouting, indexShard.term());
} else if (rarely()) { } else if (rarely()) {
failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean())); failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean()));
} }