Use standard semantics for retried auto-id requests (#47311)

Adds support for handling auto-id requests with optype CREATE. Also simplifies the code
handling this by using the standard indexing path when dealing with possible retry conflicts.

Relates #47169
This commit is contained in:
Yannick Welsch 2019-10-01 14:53:27 +02:00
parent 7b2613db55
commit 8c11fe610e
2 changed files with 96 additions and 97 deletions

View File

@ -1005,13 +1005,9 @@ public class InternalEngine extends Engine {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere();
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(1L);
}
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) {
plan = IndexingStrategy.optimizedAppendOnly(1L);
} else {
versionMap.enforceSafeAccess();
// resolves incoming version
@ -1044,7 +1040,7 @@ public class InternalEngine extends Engine {
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
index.versionType().updateVersion(currentVersion, index.version())
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version())
);
}
}
@ -1196,11 +1192,6 @@ public class InternalEngine extends Engine {
true, false, versionForIndexing, null);
}
static IndexingStrategy overrideExistingAsIfNotThere() {
return new IndexingStrategy(true, true, true,
false, 1L, null);
}
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false,
false, versionForIndexing, null);

View File

@ -3421,28 +3421,36 @@ public class InternalEngineTests extends EngineTestCase {
public void testDoubleDeliveryPrimary() throws IOException {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyPrimary(doc, false, 1);
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
final boolean create = randomBoolean();
Engine.Index operation = appendOnlyPrimary(doc, false, 1, create);
Engine.Index retry = appendOnlyPrimary(doc, true, 1, create);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
Engine.IndexResult retryResult = engine.index(retry);
assertLuceneOperations(engine, 1, 1, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
assertLuceneOperations(engine, 1, create ? 0 : 1, 0);
assertEquals(1, engine.getNumVersionLookups());
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertLuceneOperations(engine, 0, 1, 0);
assertEquals(0, engine.getNumVersionLookups());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertLuceneOperations(engine, 0, 2, 0);
assertEquals(0, engine.getNumVersionLookups());
assertLuceneOperations(engine, 1, create ? 0 : 1, 0);
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}
engine.refresh("test");
@ -3450,20 +3458,34 @@ public class InternalEngineTests extends EngineTestCase {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits.value);
}
operation = appendOnlyPrimary(doc, false, 1);
retry = appendOnlyPrimary(doc, true, 1);
operation = appendOnlyPrimary(doc, false, 1, create);
retry = appendOnlyPrimary(doc, true, 1, create);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}
engine.refresh("test");
@ -3520,60 +3542,53 @@ public class InternalEngineTests extends EngineTestCase {
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
final Supplier<ParsedDocument> doc = () -> testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyReplica(doc.get(), false, 1, randomIntBetween(0, 5));
Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5));
// operations with a seq# equal or lower to the local checkpoint are not indexed to lucene
// and the version lookup is skipped
final boolean sameSeqNo = operation.seqNo() == retry.seqNo();
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
Engine.IndexResult retryResult = engine.index(retry);
if (retry.seqNo() > operation.seqNo()) {
assertLuceneOperations(engine, 1, 1, 0);
} else {
assertLuceneOperations(engine, 1, 0, 0);
}
assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
if (operation.seqNo() > retry.seqNo()) {
assertLuceneOperations(engine, 1, 1, 0);
} else {
assertLuceneOperations(engine, 1, 0, 0);
}
assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
}
boolean replicaOperationIsRetry = randomBoolean();
Engine.Index operation = appendOnlyReplica(doc.get(), replicaOperationIsRetry, 1, randomIntBetween(0, 5));
Engine.IndexResult result = engine.index(operation);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(result.getTranslogLocation());
// promote to primary: first do refresh
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits.value);
}
operation = randomAppendOnly(doc.get(), false, 1);
retry = randomAppendOnly(doc.get(), true, 1);
final boolean create = randomBoolean();
operation = appendOnlyPrimary(doc.get(), false, 1, create);
Engine.Index retry = appendOnlyPrimary(doc.get(), true, 1, create);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
// if the replica operation wasn't a retry, the operation arriving on the newly promoted primary must be a retry
if (replicaOperationIsRetry) {
Engine.IndexResult indexResult = engine.index(operation);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}
engine.refresh("test");
@ -3651,10 +3666,11 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(indexResult.getVersion(), equalTo(1L));
isRetry = true;
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL,
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
assertNotEquals(indexResult.getSeqNo(), UNASSIGNED_SEQ_NO);
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
@ -3695,7 +3711,7 @@ public class InternalEngineTests extends EngineTestCase {
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
0);
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
assertTrue(indexResult.isCreated());
assertFalse(indexResult.isCreated());
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
@ -3720,12 +3736,16 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) {
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
UNASSIGNED_SEQ_NO, 0);
}
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
return appendOnlyPrimary(doc, retry, autoGeneratedIdTimestamp, randomBoolean());
}
public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) {
return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null,
Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0);
@ -3736,14 +3756,15 @@ public class InternalEngineTests extends EngineTestCase {
int numDocs = randomIntBetween(1000, 10000);
List<Engine.Index> docs = new ArrayList<>();
final boolean primary = randomBoolean();
final boolean create = randomBoolean();
for (int i = 0; i < numDocs; i++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(i), null,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
final Engine.Index originalIndex;
final Engine.Index retryIndex;
if (primary) {
originalIndex = appendOnlyPrimary(doc, false, i);
retryIndex = appendOnlyPrimary(doc, true, i);
originalIndex = appendOnlyPrimary(doc, false, i, create);
retryIndex = appendOnlyPrimary(doc, true, i, create);
} else {
originalIndex = appendOnlyReplica(doc, false, i, i * 2);
retryIndex = appendOnlyReplica(doc, true, i, i * 2);
@ -3776,25 +3797,12 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
if (primary) {
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
} else {
// we don't really know what order the operations will arrive and thus can't predict how many
// version lookups will be needed
assertThat(engine.getNumIndexVersionsLookups(), lessThanOrEqualTo(engine.getNumVersionLookups()));
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
int count = searcher.count(new MatchAllDocsQuery());
assertEquals(numDocs, count);
}
if (primary) {
// primaries rely on lucene dedup and may index the same document twice
assertThat(engine.getNumDocUpdates(), greaterThanOrEqualTo((long) numDocs));
assertThat(engine.getNumDocAppends() + engine.getNumDocUpdates(), equalTo(numDocs * 2L));
} else {
// replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene
if (create || primary == false) {
assertLuceneOperations(engine, numDocs, 0, 0);
}
}
@ -3826,7 +3834,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp2));
engine.index(appendOnlyPrimary(doc, true, timestamp2, false));
assertEquals(maxTimestamp12, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
globalCheckpoint.set(1); // make sure flush cleans up commits for later.
engine.flush();
@ -4762,7 +4770,7 @@ public class InternalEngineTests extends EngineTestCase {
parsedDocument,
UNASSIGNED_SEQ_NO,
randomIntBetween(1, 8),
Versions.MATCH_ANY,
Versions.NOT_FOUND,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
System.nanoTime(),