Fill gaps on primary promotion

When a primary is promoted, it could have gaps in its history due to
concurrency and in-flight operations when it was serving as a
replica. This commit fills the gaps in the history of the promoted shard
after all operations from the previous term have drained, and future
operations are blocked. This commit does not handle replicating the
no-ops that fill the gaps to any remaining replicas, that is the
responsibility of the primary/replica sync that we are laying the ground
work for.

Relates #24945
This commit is contained in:
Jason Tedor 2017-05-30 13:19:44 -04:00 committed by GitHub
parent ce7195d81a
commit b28141a990
2 changed files with 72 additions and 2 deletions

View File

@ -369,7 +369,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
latch::await,
() -> {
latch.await();
getEngine().fillSeqNoGaps(newPrimaryTerm);
},
e -> failShard("exception during primary term transition", e));
primaryTerm = newPrimaryTerm;
latch.countDown();

View File

@ -131,7 +131,9 @@ import static java.util.Collections.emptySet;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.VersionType.EXTERNAL;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
@ -141,6 +143,7 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
/**
@ -388,6 +391,70 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false);
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
if (!rarely()) {
final Term uid = new Term("_id", doc.id());
final Engine.Index index =
new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false);
indexShard.index(index);
max = i;
} else {
gap = true;
}
}
final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
TestShardRouting.newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
* gaps are filled.
*/
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquirePrimaryOperationPermit(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
latch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.GENERIC);
latch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo));
closeShards(indexShard);
}
public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
@ -1172,7 +1239,7 @@ public class IndexShardTests extends IndexShardTestCase {
test = otherShard.prepareIndexOnReplica(
SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(),
XContentType.JSON),
1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
1, 1, 1, EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
otherShard.index(test);
final ShardRouting primaryShardRouting = shard.routingEntry();