diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dc43d42c94a..772607903e6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -198,8 +198,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; - protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - protected volatile long operationPrimaryTerm; + private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm + private volatile long operationPrimaryTerm; protected final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1ea90ec6e8f..26bc4c964ee 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -956,18 +956,18 @@ public class IndexShardTests extends IndexShardTestCase { // our operation should be blocked until the previous operations complete assertFalse(onResponse.get()); assertNull(onFailure.get()); - assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm)); + assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm)); assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); Releasables.close(operation1); // our operation should still be blocked assertFalse(onResponse.get()); assertNull(onFailure.get()); - assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm)); + assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm)); assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); Releasables.close(operation2); barrier.await(); // now lock acquisition should have succeeded - assertThat(indexShard.operationPrimaryTerm, equalTo(newPrimaryTerm)); + assertThat(indexShard.getOperationPrimaryTerm(), equalTo(newPrimaryTerm)); assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm)); assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); if (engineClosed) { @@ -1008,7 +1008,7 @@ public class IndexShardTests extends IndexShardTestCase { } }; - final long oldPrimaryTerm = indexShard.pendingPrimaryTerm - 1; + final long oldPrimaryTerm = indexShard.getPendingPrimaryTerm() - 1; randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(), randomNonNegativeLong(), onLockAcquired, ""); latch.await(); @@ -1030,7 +1030,7 @@ public class IndexShardTests extends IndexShardTestCase { long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); PlainActionFuture fut = new PlainActionFuture<>(); - randomReplicaOperationPermitAcquisition(replica, replica.operationPrimaryTerm, replica.getGlobalCheckpoint(), + randomReplicaOperationPermitAcquisition(replica, replica.getOperationPrimaryTerm(), replica.getGlobalCheckpoint(), newMaxSeqNoOfUpdates, fut, ""); try (Releasable ignored = fut.actionGet()) { assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates))); @@ -1181,7 +1181,7 @@ public class IndexShardTests extends IndexShardTestCase { final Engine beforeRollbackEngine = indexShard.getEngine(); final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE); randomReplicaOperationPermitAcquisition(indexShard, - indexShard.pendingPrimaryTerm + 1, + indexShard.getPendingPrimaryTerm() + 1, globalCheckpoint, newMaxSeqNoOfUpdates, new ActionListener() { @@ -2105,10 +2105,6 @@ public class IndexShardTests extends IndexShardTestCase { new SourceToParse(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - // Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations - // in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations. - shard.pendingPrimaryTerm++; - shard.operationPrimaryTerm++; shard.getEngine().rollTranslogGeneration(); shard.markSeqNoAsNoop(1, "test"); shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, @@ -2118,11 +2114,20 @@ public class IndexShardTests extends IndexShardTestCase { closeShard(shard, false); // Recovering from store should discard doc #1 final ShardRouting replicaRouting = shard.routingEntry(); - IndexShard newShard = reinitShard(shard, - newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, - RecoverySource.ExistingStoreRecoverySource.INSTANCE)); - newShard.pendingPrimaryTerm++; - newShard.operationPrimaryTerm++; + final IndexMetaData newShardIndexMetadata = IndexMetaData.builder(shard.indexSettings().getIndexMetaData()) + .primaryTerm(replicaRouting.shardId().id(), shard.getOperationPrimaryTerm() + 1) + .build(); + closeShards(shard); + IndexShard newShard = newShard( + newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.ExistingStoreRecoverySource.INSTANCE), + shard.shardPath(), + newShardIndexMetadata, + null, + null, + shard.getEngineFactory(), + shard.getGlobalCheckpointSyncer(), + EMPTY_EVENT_LISTENER); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 14e513ff89c..496221ca9fc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -83,7 +83,7 @@ public class ShardGetServiceTests extends IndexShardTestCase { assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); - final long primaryTerm = primary.operationPrimaryTerm; + final long primaryTerm = primary.getOperationPrimaryTerm(); testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");