Validate source of an index in LuceneChangesSnapshot (#32288)

Today it's possible to encounter an Index operation in Lucene whose
_source is disabled, and _recovery_source was pruned by the MergePolicy.
If it's the case, we create a Translog#Index without source and let the
caller validate it later. However, this approach is challenging for the
caller.

Deletes and No-Ops don't allow invoking "source()" method. The caller
has to make sure to call "source()" only on index operations. The
current implementation in CCR does not follow this and fail to replica
deletes or no-ops. Moreover, it's easier to reason if a Translog#Index
always has the source.
This commit is contained in:
Nhat Nguyen 2018-07-27 08:16:52 -04:00 committed by GitHub
parent 90c58872ff
commit 8474f8a01c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 34 deletions

View File

@ -258,10 +258,21 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
} else {
final BytesReference source = fields.source();
if (source == null) {
// TODO: Callers should ask for the range that source should be retained. Thus we should always
// check for the existence source once we make peer-recovery to send ops after the local checkpoint.
if (requiredFullRange) {
throw new IllegalStateException("source not found for seqno=" + seqNo +
" from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo);
} else {
skippedOperations++;
return null;
}
}
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(type, id, seqNo, primaryTerm, version,
source == null ? null : source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
}
}
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +

View File

@ -1061,7 +1061,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
byte[] source, String routing, long autoGeneratedIdTimestamp) {
this.type = type;
this.id = id;
this.source = source == null ? null : new BytesArray(source);
this.source = new BytesArray(source);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
@ -1111,7 +1111,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public Source getSource() {
return source == null ? null : new Source(source, routing);
return new Source(source, routing);
}
private void write(final StreamOutput out) throws IOException {

View File

@ -1418,26 +1418,36 @@ public class InternalEngineTests extends EngineTestCase {
final MapperService mapperService = createMapperService("test");
final boolean omitSourceAllTheTime = randomBoolean();
final Set<String> liveDocs = new HashSet<>();
final Set<String> liveDocsWithSource = new HashSet<>();
try (Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null,
globalCheckpoint::get))) {
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean()
|| omitSourceAllTheTime);
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
if (useRecoverySource == false) {
liveDocsWithSource.add(Integer.toString(i));
}
}
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean()
|| omitSourceAllTheTime);
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocs.remove(doc.id());
liveDocsWithSource.remove(doc.id());
}
if (randomBoolean()) {
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
if (useRecoverySource == false) {
liveDocsWithSource.add(doc.id());
} else {
liveDocsWithSource.remove(doc.id());
}
}
if (randomBoolean()) {
engine.flush(randomBoolean(), true);
@ -1453,12 +1463,7 @@ public class InternalEngineTests extends EngineTestCase {
minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1);
}
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> {
if (luceneOp.seqNo() >= minSeqNoToRetain) {
assertNotNull(luceneOp.getSource());
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
}
});
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= engine.getLocalCheckpoint(); seqno++) {
@ -1483,10 +1488,8 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(engine.getLocalCheckpoint());
engine.syncTranslog();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> {
assertEquals(translogOp.getSource().source, B_1);
});
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size()));
}
}

View File

@ -104,7 +104,6 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
@ -822,14 +821,6 @@ public abstract class EngineTestCase extends ESTestCase {
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapper, (luceneOp, translogOp) ->
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)));
}
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper,
BiConsumer<Translog.Operation, Translog.Operation> assertSource)
throws IOException {
if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) {
return;
}
@ -867,7 +858,7 @@ public abstract class EngineTestCase extends ESTestCase {
assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
assertSource.accept(luceneOp, translogOp);
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
}
}
}

View File

@ -283,10 +283,6 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
if (op.getSource() == null) {
throw new IllegalStateException("source not found for operation: " + op + " fromSeqNo: " + fromSeqNo +
" maxOperationCount: " + maxOperationCount);
}
operations.add(op);
seenBytes += op.estimateSize();
if (seenBytes > maxOperationSizeInBytes) {

View File

@ -7,6 +7,9 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -30,6 +33,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -50,12 +54,22 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint());
docCount += leaderGroup.appendDocs(randomInt(128));
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
int expectedCount = docCount;
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(expectedCount);
followerGroup.assertAllEqual(indexedDocIds.size());
});
// Deletes should be replicated to the follower
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId));
assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED));
}
leaderGroup.syncGlobalCheckpoint();
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
});
shardFollowTask.markAsCompleted();
}