Do not optimize append-only if seen normal op with higher seqno (#28787)

When processing an append-only operation, primary knows that operations 
can only conflict with another instance of the same operation. This is
true as the id was freshly generated. However this property doesn't hold
for replicas. As soon as an auto-generated ID was indexed into the
primary, it can be exposed to a search and users can issue a follow up
operation on it. In extremely rare cases, the follow up operation can be
arrived and processed on a replica before the original append-only
request. In this case we can't simply proceed with the append-only
request and blindly add it to the index without consulting the version
map. 

The following scenario can cause difference between primary and
replica.

1. Primary indexes an auto-gen-id doc. (id=X, v=1, s#=20)
2. A refresh cycle happens on primary
3. The new doc is picked up and modified - say by a delete by query
   request - Primary gets a delete doc (id=X, v=2, s#=30)
4. Delete doc is processed first on the replica (id=X, v=2, s#=30)
5. Indexing operation arrives on the replica, since it's an auto-gen-id
   request and the retry marker is lower, we put it into lucene without 
   any check. Replica has a doc the primary doesn't have.

To deal with a potential conflict between an append-only operation and a 
normal operation on replicas, we need to rely on sequence numbers. This
commit maintains the max seqno of non-append-only operations on replica
then only apply optimization for an append-only operation only if its
seq# is higher than the seq# of all non-append-only.
This commit is contained in:
Nhat Nguyen 2018-03-26 16:56:12 -04:00 committed by GitHub
parent 7bf9091942
commit 0ac89a32cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 150 additions and 9 deletions

View File

@ -136,6 +136,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
/**
@ -186,7 +187,7 @@ public class InternalEngine extends Engine {
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer);
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
@ -345,15 +346,20 @@ public class InternalEngine extends Engine {
}
}
private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
break;
final String key = entry.getKey();
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
assert maxUnsafeAutoIdTimestamp.get() == -1 :
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
"max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
}
}
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}
@Override
@ -803,11 +809,24 @@ public class InternalEngine extends Engine {
private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
final IndexingStrategy plan;
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
// no need to deal with out of order delivery - we never saw this one
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
/*
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
@ -942,6 +961,11 @@ public class InternalEngine extends Engine {
return mayHaveBeenIndexBefore;
}
// for testing
long getMaxSeqNoOfNonAppendOnlyOperations() {
return maxSeqNoOfNonAppendOnlyOperations.get();
}
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
@ -1097,6 +1121,9 @@ public class InternalEngine extends Engine {
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity

View File

@ -4530,4 +4530,96 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
final CountDownLatch latch = new CountDownLatch(1);
final Thread appendOnlyIndexer = new Thread(() -> {
try {
latch.countDown();
final int numDocs = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) {
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo()));
} else {
engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong()));
}
}
} catch (Exception ex) {
throw new RuntimeException("Failed to index", ex);
}
});
appendOnlyIndexer.setName("append-only indexer");
appendOnlyIndexer.start();
latch.await();
long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED;
final int numOps = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numOps; i++) {
ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean());
if (randomBoolean()) {
engine.index(doc);
} else {
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(),
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis()));
}
maxSeqNoOfNonAppendOnly = seqno;
} else { // On primary - do not update max_seqno for non-append-only operations
if (randomBoolean()) {
engine.index(indexForDoc(parsedDocument));
} else {
engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id())));
}
}
}
appendOnlyIndexer.join(120_000);
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
engine.flush();
}
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
assertThat("max_seqno from non-append-only was not bootstrap from the safe commit",
engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get()));
}
}
public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception {
long lookupTimes = 0L;
final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker();
final int initDocs = between(0, 10);
for (int i = 0; i < initDocs; i++) {
index(engine, i);
lookupTimes++;
}
// doc1 is delayed and arrived after a non-append-only op.
final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo();
final long seqnoNormalOp = localCheckpointTracker.generateSeqNo();
if (randomBoolean()) {
engine.index(replicaIndexForDoc(
testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false));
} else {
engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong()));
}
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp));
// should not optimize for doc1 and process as a regular doc (eg. look up in version map)
engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), seqNoAppendOnly1));
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
// optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map.
engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
@ -419,6 +420,27 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
shard.onSettingsChanged();
}
/**
* This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation
* of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and
* deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request.
*/
public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
// Append-only request - without id
final BulkShardRequest indexRequest = indexOnPrimary(
new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary);
final String docId = Iterables.get(getShardDocUIDs(primary), 0);
final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary);
deleteOnReplica(deleteRequest, shards, replica);
indexOnReplica(indexRequest, shards, replica);
shards.assertAllEqual(0);
}
}
/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;