Make primary terms fields private in index shard (#38036)

This commit encapsulates the primary terms fields in index shard. This
is a precursor to pushing the operation primary term down to the
replication tracker.
This commit is contained in:
Jason Tedor 2019-01-30 12:56:58 -05:00 committed by GitHub
parent 9782aaa1b8
commit c468b2f7ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 18 deletions

View File

@ -198,8 +198,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile ShardRouting shardRouting; protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state; protected volatile IndexShardState state;
protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
protected volatile long operationPrimaryTerm; private volatile long operationPrimaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>(); protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory; final EngineFactory engineFactory;

View File

@ -956,18 +956,18 @@ public class IndexShardTests extends IndexShardTestCase {
// our operation should be blocked until the previous operations complete // our operation should be blocked until the previous operations complete
assertFalse(onResponse.get()); assertFalse(onResponse.get());
assertNull(onFailure.get()); assertNull(onFailure.get());
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm)); assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm));
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
Releasables.close(operation1); Releasables.close(operation1);
// our operation should still be blocked // our operation should still be blocked
assertFalse(onResponse.get()); assertFalse(onResponse.get());
assertNull(onFailure.get()); assertNull(onFailure.get());
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm)); assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm));
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
Releasables.close(operation2); Releasables.close(operation2);
barrier.await(); barrier.await();
// now lock acquisition should have succeeded // now lock acquisition should have succeeded
assertThat(indexShard.operationPrimaryTerm, equalTo(newPrimaryTerm)); assertThat(indexShard.getOperationPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm)); assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
if (engineClosed) { if (engineClosed) {
@ -1008,7 +1008,7 @@ public class IndexShardTests extends IndexShardTestCase {
} }
}; };
final long oldPrimaryTerm = indexShard.pendingPrimaryTerm - 1; final long oldPrimaryTerm = indexShard.getPendingPrimaryTerm() - 1;
randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(), randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(),
randomNonNegativeLong(), onLockAcquired, ""); randomNonNegativeLong(), onLockAcquired, "");
latch.await(); latch.await();
@ -1030,7 +1030,7 @@ public class IndexShardTests extends IndexShardTestCase {
long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
PlainActionFuture<Releasable> fut = new PlainActionFuture<>(); PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
randomReplicaOperationPermitAcquisition(replica, replica.operationPrimaryTerm, replica.getGlobalCheckpoint(), randomReplicaOperationPermitAcquisition(replica, replica.getOperationPrimaryTerm(), replica.getGlobalCheckpoint(),
newMaxSeqNoOfUpdates, fut, ""); newMaxSeqNoOfUpdates, fut, "");
try (Releasable ignored = fut.actionGet()) { try (Releasable ignored = fut.actionGet()) {
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates))); assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates)));
@ -1181,7 +1181,7 @@ public class IndexShardTests extends IndexShardTestCase {
final Engine beforeRollbackEngine = indexShard.getEngine(); final Engine beforeRollbackEngine = indexShard.getEngine();
final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE); final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE);
randomReplicaOperationPermitAcquisition(indexShard, randomReplicaOperationPermitAcquisition(indexShard,
indexShard.pendingPrimaryTerm + 1, indexShard.getPendingPrimaryTerm() + 1,
globalCheckpoint, globalCheckpoint,
newMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates,
new ActionListener<Releasable>() { new ActionListener<Releasable>() {
@ -2105,10 +2105,6 @@ public class IndexShardTests extends IndexShardTestCase {
new SourceToParse(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); new SourceToParse(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON));
flushShard(shard); flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
// Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations
// in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations.
shard.pendingPrimaryTerm++;
shard.operationPrimaryTerm++;
shard.getEngine().rollTranslogGeneration(); shard.getEngine().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, "test"); shard.markSeqNoAsNoop(1, "test");
shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
@ -2118,11 +2114,20 @@ public class IndexShardTests extends IndexShardTestCase {
closeShard(shard, false); closeShard(shard, false);
// Recovering from store should discard doc #1 // Recovering from store should discard doc #1
final ShardRouting replicaRouting = shard.routingEntry(); final ShardRouting replicaRouting = shard.routingEntry();
IndexShard newShard = reinitShard(shard, final IndexMetaData newShardIndexMetadata = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, .primaryTerm(replicaRouting.shardId().id(), shard.getOperationPrimaryTerm() + 1)
RecoverySource.ExistingStoreRecoverySource.INSTANCE)); .build();
newShard.pendingPrimaryTerm++; closeShards(shard);
newShard.operationPrimaryTerm++; IndexShard newShard = newShard(
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE),
shard.shardPath(),
newShardIndexMetadata,
null,
null,
shard.getEngineFactory(),
shard.getGlobalCheckpointSyncer(),
EMPTY_EVENT_LISTENER);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore()); assertTrue(newShard.recoverFromStore());

View File

@ -83,7 +83,7 @@ public class ShardGetServiceTests extends IndexShardTestCase {
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
final long primaryTerm = primary.operationPrimaryTerm; final long primaryTerm = primary.getOperationPrimaryTerm();
testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm); testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");