diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index e5c46cbb0ee..4419d921a3b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -494,12 +494,20 @@ public class ShardStateAction { } } - public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { - shardStarted(shardRouting, message, listener, clusterService.state()); + public void shardStarted(final ShardRouting shardRouting, + final long primaryTerm, + final String message, + final Listener listener) { + shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state()); } - public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) { - StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message); - sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener); + + public void shardStarted(final ShardRouting shardRouting, + final long primaryTerm, + final String message, + final Listener listener, + final ClusterState currentState) { + StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message); + sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); } private static class ShardStartedTransportHandler implements TransportRequestHandler { @@ -544,7 +552,7 @@ public class ShardStateAction { List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates for (StartedShardEntry task : tasks) { - ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); + final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started // events on every cluster state publishing that does not contain the shard as started yet. This means that old stale @@ -553,6 +561,19 @@ public class ShardStateAction { logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task); builder.success(task); } else { + if (matched.primary() && task.primaryTerm > 0) { + final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); + assert indexMetaData != null; + final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id()); + if (currentPrimaryTerm != task.primaryTerm) { + assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " + + "current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])"; + logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})", + task.shardId, task, task.primaryTerm, currentPrimaryTerm); + builder.success(task); + continue; + } + } if (matched.initializing() == false) { assert matched.active() : "expected active shard routing for task " + task + " but found " + matched; // same as above, this might have been a stale in-flight request, so we just ignore. @@ -597,6 +618,7 @@ public class ShardStateAction { public static class StartedShardEntry extends TransportRequest { final ShardId shardId; final String allocationId; + final long primaryTerm; final String message; StartedShardEntry(StreamInput in) throws IOException { @@ -604,8 +626,12 @@ public class ShardStateAction { shardId = ShardId.readShardId(in); allocationId = in.readString(); if (in.getVersion().before(Version.V_6_3_0)) { - final long primaryTerm = in.readVLong(); + primaryTerm = in.readVLong(); assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]"; + } else if (in.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport + primaryTerm = in.readVLong(); + } else { + primaryTerm = UNASSIGNED_PRIMARY_TERM; } this.message = in.readString(); if (in.getVersion().before(Version.V_6_3_0)) { @@ -614,9 +640,10 @@ public class ShardStateAction { } } - public StartedShardEntry(ShardId shardId, String allocationId, String message) { + public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) { this.shardId = shardId; this.allocationId = allocationId; + this.primaryTerm = primaryTerm; this.message = message; } @@ -627,6 +654,8 @@ public class ShardStateAction { out.writeString(allocationId); if (out.getVersion().before(Version.V_6_3_0)) { out.writeVLong(0L); + } else if (out.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport + out.writeVLong(primaryTerm); } out.writeString(message); if (out.getVersion().before(Version.V_6_3_0)) { @@ -636,8 +665,8 @@ public class ShardStateAction { @Override public String toString() { - return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}", - shardId, allocationId, message); + return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}", + shardId, allocationId, primaryTerm, message); } } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 80ac05ece82..5955a749fea 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -575,13 +575,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple } try { - logger.debug("{} creating shard", shardRouting.shardId()); + final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id()); + logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard( shardRouting, recoveryState, recoveryTargetService, - new RecoveryListener(shardRouting), + new RecoveryListener(shardRouting, primaryTerm), repositoriesService, failedShardHandler, globalCheckpointSyncer, @@ -598,9 +599,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple "local shard has a different allocation id but wasn't cleaning by removeShards. " + "cluster state: " + shardRouting + " local: " + currentRoutingEntry; + final long primaryTerm; try { final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex()); - final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id()); + primaryTerm = indexMetaData.primaryTerm(shard.shardId().id()); final Set inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id()); final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); final Set pre60AllocationIds = indexShardRoutingTable.assignedShards() @@ -633,7 +635,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple shardRouting.shardId(), state, nodes.getMasterNode()); } if (nodes.getMasterNode() != null) { - shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() + + shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() + " marked shard as initializing, but shard state is [" + state + "], mark shard as started", SHARD_STATE_ACTION_LISTENER, clusterState); } @@ -673,15 +675,24 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener { + /** + * ShardRouting with which the shard was created + */ private final ShardRouting shardRouting; - private RecoveryListener(ShardRouting shardRouting) { + /** + * Primary term with which the shard was created + */ + private final long primaryTerm; + + private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) { this.shardRouting = shardRouting; + this.primaryTerm = primaryTerm; } @Override - public void onRecoveryDone(RecoveryState state) { - shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + public void onRecoveryDone(final RecoveryState state) { + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 1d3a523cdc9..20b7548004f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -34,7 +35,6 @@ import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -64,19 +64,19 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC public void testEmptyTaskListProducesSameClusterState() throws Exception { final ClusterState clusterState = stateWithNoShard(); - assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, Collections.emptyList()); + assertSame(clusterState, result.resultingState); } public void testNonExistentIndexMarkedAsSuccessful() throws Exception { final ClusterState clusterState = stateWithNoShard(); - final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test"); - assertTasksExecution(clusterState, singletonList(entry), - result -> { - assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(1)); - assertThat(result.executionResults.containsKey(entry), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); - }); + final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", randomNonNegativeLong(), "test"); + + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(entry)); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(entry), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); } public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { @@ -87,20 +87,19 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC final List tasks = Stream.concat( // Existent shard id but different allocation id IntStream.range(0, randomIntBetween(1, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")), + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), 0L, "allocation id")), // Non existent shard id IntStream.range(1, randomIntBetween(2, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id")) + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), 0L, "shard id")) ).collect(Collectors.toList()); - assertTasksExecution(clusterState, tasks, result -> { - assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - }); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); }); } @@ -119,16 +118,16 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC } else { allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); } - return new StartedShardEntry(shardId, allocationId, "test"); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); + return new StartedShardEntry(shardId, allocationId, primaryTerm, "test"); }).collect(Collectors.toList()); - assertTasksExecution(clusterState, tasks, result -> { - assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - }); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); }); } @@ -138,26 +137,26 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); final String primaryAllocationId = primaryShard.allocationId().getId(); final List tasks = new ArrayList<>(); - tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test")); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test")); if (randomBoolean()) { final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); final String replicaAllocationId = replicaShard.allocationId().getId(); - tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test")); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test")); } - assertTasksExecution(clusterState, tasks, result -> { - assertNotSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); - assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); - }); + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); }); } @@ -169,29 +168,88 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); final String allocationId = shardRouting.allocationId().getId(); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); final List tasks = IntStream.range(0, randomIntBetween(2, 10)) - .mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test")) + .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test")) .collect(Collectors.toList()); - assertTasksExecution(clusterState, tasks, result -> { - assertNotSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); - assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); - }); + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); }); } - private void assertTasksExecution(final ClusterState state, - final List tasks, - final Consumer consumer) throws Exception { + public void testPrimaryTermsMismatch() throws Exception { + final String indexName = "test"; + final int shard = 0; + final int primaryTerm = 2 + randomInt(200); + + ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + clusterState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .put(IndexMetaData.builder(clusterState.metaData().index(indexName)) + .primaryTerm(shard, primaryTerm) + .build(), true) + .build()) + .build(); + final ShardId shardId = new ShardId(clusterState.metaData().index(indexName).getIndex(), shard); + final String primaryAllocationId = clusterState.routingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); + { + final StartedShardEntry task = + new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary"); + + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.INITIALIZING)); + assertSame(clusterState, result.resultingState); + } + { + final StartedShardEntry task = + new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary"); + + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + assertNotSame(clusterState, result.resultingState); + clusterState = result.resultingState; + } + { + final long replicaPrimaryTerm = randomBoolean() ? primaryTerm : primaryTerm - 1; + final String replicaAllocationId = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next() + .allocationId().getId(); + + final StartedShardEntry task = new StartedShardEntry(shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica"); + + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + assertNotSame(clusterState, result.resultingState); + } + } + + private ClusterStateTaskExecutor.ClusterTasksResult executeTasks(final ClusterState state, + final List tasks) throws Exception { final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(state, tasks); assertThat(result, notNullValue()); - consumer.accept(result); + return result; } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index e94a974ae7a..a800c0c7992 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -72,12 +72,14 @@ import java.util.function.Predicate; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; public class ShardStateActionTests extends ESTestCase { @@ -420,8 +422,9 @@ public class ShardStateActionTests extends ESTestCase { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); final ShardRouting shardRouting = getRandomShardRouting(index); + final long primaryTerm = clusterService.state().metaData().index(shardRouting.index()).primaryTerm(shardRouting.id()); final TestListener listener = new TestListener(); - shardStateAction.shardStarted(shardRouting, "testShardStarted", listener); + shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); @@ -429,6 +432,7 @@ public class ShardStateActionTests extends ESTestCase { ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request; assertThat(entry.shardId, equalTo(shardRouting.shardId())); assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); + assertThat(entry.primaryTerm, equalTo(primaryTerm)); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); listener.await(); @@ -481,7 +485,7 @@ public class ShardStateActionTests extends ESTestCase { final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); - try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, reason), bwcVersion).streamInput()) { + try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, 0L, reason), bwcVersion).streamInput()) { in.setVersion(bwcVersion); final FailedShardEntry failedShardEntry = new FailedShardEntry(in); assertThat(failedShardEntry.shardId, equalTo(shardId)); @@ -490,8 +494,7 @@ public class ShardStateActionTests extends ESTestCase { assertThat(failedShardEntry.failure, nullValue()); assertThat(failedShardEntry.markAsStale, equalTo(true)); } - try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, - reason, null, false), bwcVersion).streamInput()) { + try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, reason, null, false), bwcVersion).streamInput()) { in.setVersion(bwcVersion); final StartedShardEntry startedShardEntry = new StartedShardEntry(in); assertThat(startedShardEntry.shardId, equalTo(shardId)); @@ -500,6 +503,56 @@ public class ShardStateActionTests extends ESTestCase { } } + public void testFailedShardEntrySerialization() throws Exception { + final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); + final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final long primaryTerm = randomIntBetween(0, 100); + final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final Exception failure = randomBoolean() ? null : getSimulatedFailure(); + final boolean markAsStale = randomBoolean(); + + final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT)); + final FailedShardEntry failedShardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); + try (StreamInput in = serialize(failedShardEntry, version).streamInput()) { + in.setVersion(version); + final FailedShardEntry deserialized = new FailedShardEntry(in); + assertThat(deserialized.shardId, equalTo(shardId)); + assertThat(deserialized.allocationId, equalTo(allocationId)); + assertThat(deserialized.primaryTerm, equalTo(primaryTerm)); + assertThat(deserialized.message, equalTo(message)); + if (failure != null) { + assertThat(deserialized.failure, notNullValue()); + assertThat(deserialized.failure.getClass(), equalTo(failure.getClass())); + assertThat(deserialized.failure.getMessage(), equalTo(failure.getMessage())); + } else { + assertThat(deserialized.failure, nullValue()); + } + assertThat(deserialized.markAsStale, equalTo(markAsStale)); + assertEquals(failedShardEntry, deserialized); + } + } + + public void testStartedShardEntrySerialization() throws Exception { + final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); + final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final long primaryTerm = randomIntBetween(0, 100); + final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + + final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT)); + try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, primaryTerm, message), version).streamInput()) { + in.setVersion(version); + final StartedShardEntry deserialized = new StartedShardEntry(in); + assertThat(deserialized.shardId, equalTo(shardId)); + assertThat(deserialized.allocationId, equalTo(allocationId)); + if (version.onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport + assertThat(deserialized.primaryTerm, equalTo(primaryTerm)); + } else { + assertThat(deserialized.primaryTerm, equalTo(0L)); + } + assertThat(deserialized.message, equalTo(message)); + } + } + BytesReference serialize(Writeable writeable, Version version) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(version); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index f248d46b117..9b6cae43081 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -361,11 +361,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC assertThat(this.shardId(), equalTo(shardRouting.shardId())); assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting)); if (this.shardRouting.active()) { - assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, + assertTrue("an active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, shardRouting.active()); } if (this.shardRouting.primary()) { assertTrue("a primary shard can't be demoted", shardRouting.primary()); + if (this.shardRouting.initializing()) { + assertEquals("primary term can not be updated on an initializing primary shard: " + shardRouting, term, newPrimaryTerm); + } } else if (shardRouting.primary()) { // note: it's ok for a replica in post recovery to be started and promoted at once // this can happen when the primary failed after we sent the start shard message @@ -390,6 +393,10 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC return null; } + public long term() { + return term; + } + public void updateTerm(long newTerm) { assertThat("term can only be incremented: " + shardRouting, newTerm, greaterThanOrEqualTo(term)); if (shardRouting.primary() && shardRouting.active()) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 8a00be28f5e..c1e32be9d29 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -277,10 +277,18 @@ public class ClusterStateChanges { } public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { - List entries = startedShards.stream().map(startedShard -> - new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), "shard started")) - .collect(Collectors.toList()); - return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries); + final Map entries = startedShards.stream() + .collect(Collectors.toMap(Function.identity(), startedShard -> { + final IndexMetaData indexMetaData = clusterState.metaData().index(startedShard.shardId().getIndex()); + return indexMetaData != null ? indexMetaData.primaryTerm(startedShard.shardId().id()) : 0L; + })); + return applyStartedShards(clusterState, entries); + } + + public ClusterState applyStartedShards(ClusterState clusterState, Map startedShards) { + return runTasks(shardStartedClusterStateTaskExecutor, clusterState, startedShards.entrySet().stream() + .map(e -> new StartedShardEntry(e.getKey().shardId(), e.getKey().allocationId().getId(), e.getValue(), "shard started")) + .collect(Collectors.toList())); } private ClusterState runTasks(ClusterStateTaskExecutor executor, ClusterState clusterState, List entries) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index b400b56b34d..e664cc87452 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -384,7 +384,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice } // randomly start and fail allocated shards - List startedShards = new ArrayList<>(); + final Map startedShards = new HashMap<>(); List failedShards = new ArrayList<>(); for (DiscoveryNode node : state.nodes()) { IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); @@ -393,7 +393,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice for (MockIndexShard indexShard : indexService) { ShardRouting persistedShardRouting = indexShard.routingEntry(); if (persistedShardRouting.initializing() && randomBoolean()) { - startedShards.add(persistedShardRouting); + startedShards.put(persistedShardRouting, indexShard.term()); } else if (rarely()) { failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean())); }