Advance max_seq_no before add operation to Lucene (#38879)
Today when processing an operation on a replica engine (or the following engine), we first add it to Lucene, then add it to translog, then finally marks its seq_no as completed. If a flush occurs after step1, but before step-3, the max_seq_no in the commit's user_data will be smaller than the seq_no of some documents in the Lucene commit.
This commit is contained in:
parent
20755e666c
commit
7e20a92888
|
@ -948,6 +948,7 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
markSeqNoAsSeen(index.seqNo());
|
||||||
return plan;
|
return plan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1301,6 +1302,7 @@ public class InternalEngine extends Engine {
|
||||||
delete.seqNo(), delete.version());
|
delete.seqNo(), delete.version());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
markSeqNoAsSeen(delete.seqNo());
|
||||||
return plan;
|
return plan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1455,6 +1457,7 @@ public class InternalEngine extends Engine {
|
||||||
public NoOpResult noOp(final NoOp noOp) {
|
public NoOpResult noOp(final NoOp noOp) {
|
||||||
NoOpResult noOpResult;
|
NoOpResult noOpResult;
|
||||||
try (ReleasableLock ignored = readLock.acquire()) {
|
try (ReleasableLock ignored = readLock.acquire()) {
|
||||||
|
markSeqNoAsSeen(noOp.seqNo());
|
||||||
noOpResult = innerNoOp(noOp);
|
noOpResult = innerNoOp(noOp);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
|
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
|
||||||
|
@ -2434,6 +2437,13 @@ public class InternalEngine extends Engine {
|
||||||
localCheckpointTracker.waitForOpsToComplete(seqNo);
|
localCheckpointTracker.waitForOpsToComplete(seqNo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
|
||||||
|
*/
|
||||||
|
protected final void markSeqNoAsSeen(long seqNo) {
|
||||||
|
localCheckpointTracker.advanceMaxSeqNo(seqNo);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if the given operation has been processed in this engine or not.
|
* Checks if the given operation has been processed in this engine or not.
|
||||||
* @return true if the given operation was processed; otherwise false.
|
* @return true if the given operation was processed; otherwise false.
|
||||||
|
|
|
@ -81,6 +81,15 @@ public class LocalCheckpointTracker {
|
||||||
return nextSeqNo++;
|
return nextSeqNo++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the provided sequence number as seen and updates the max_seq_no if needed.
|
||||||
|
*/
|
||||||
|
public synchronized void advanceMaxSeqNo(long seqNo) {
|
||||||
|
if (seqNo >= nextSeqNo) {
|
||||||
|
nextSeqNo = seqNo + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
|
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
|
||||||
*
|
*
|
||||||
|
|
|
@ -5653,4 +5653,42 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMaxSeqNoInCommitUserData() throws Exception {
|
||||||
|
AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
Thread rollTranslog = new Thread(() -> {
|
||||||
|
while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
|
||||||
|
engine.rollTranslogGeneration(); // make adding operations to translog slower
|
||||||
|
}
|
||||||
|
});
|
||||||
|
rollTranslog.start();
|
||||||
|
|
||||||
|
Thread indexing = new Thread(() -> {
|
||||||
|
long seqNo = 0;
|
||||||
|
while (running.get() && seqNo <= 1000) {
|
||||||
|
try {
|
||||||
|
String id = Long.toString(between(1, 50));
|
||||||
|
if (randomBoolean()) {
|
||||||
|
ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
|
||||||
|
engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
|
||||||
|
} else {
|
||||||
|
engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L));
|
||||||
|
}
|
||||||
|
seqNo++;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
indexing.start();
|
||||||
|
|
||||||
|
int numCommits = between(5, 20);
|
||||||
|
for (int i = 0; i < numCommits; i++) {
|
||||||
|
engine.flush(false, true);
|
||||||
|
}
|
||||||
|
running.set(false);
|
||||||
|
indexing.join();
|
||||||
|
rollTranslog.join();
|
||||||
|
assertMaxSeqNoInCommitUserData(engine);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.lucene.document.Field;
|
||||||
import org.apache.lucene.document.NumericDocValuesField;
|
import org.apache.lucene.document.NumericDocValuesField;
|
||||||
import org.apache.lucene.document.StoredField;
|
import org.apache.lucene.document.StoredField;
|
||||||
import org.apache.lucene.document.TextField;
|
import org.apache.lucene.document.TextField;
|
||||||
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.IndexWriterConfig;
|
import org.apache.lucene.index.IndexWriterConfig;
|
||||||
import org.apache.lucene.index.LeafReader;
|
import org.apache.lucene.index.LeafReader;
|
||||||
|
@ -126,6 +128,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
||||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
public abstract class EngineTestCase extends ESTestCase {
|
public abstract class EngineTestCase extends ESTestCase {
|
||||||
|
@ -254,18 +257,20 @@ public abstract class EngineTestCase extends ESTestCase {
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
if (engine != null && engine.isClosed.get() == false) {
|
try {
|
||||||
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
|
if (engine != null && engine.isClosed.get() == false) {
|
||||||
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
|
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
|
||||||
|
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
|
||||||
|
assertMaxSeqNoInCommitUserData(engine);
|
||||||
|
}
|
||||||
|
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
|
||||||
|
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
|
||||||
|
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
|
||||||
|
assertMaxSeqNoInCommitUserData(replicaEngine);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
|
||||||
}
|
}
|
||||||
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
|
|
||||||
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
|
|
||||||
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
|
|
||||||
}
|
|
||||||
IOUtils.close(
|
|
||||||
replicaEngine, storeReplica,
|
|
||||||
engine, store);
|
|
||||||
terminate(threadPool);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1067,6 +1072,21 @@ public abstract class EngineTestCase extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.
|
||||||
|
*/
|
||||||
|
public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception {
|
||||||
|
List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
|
||||||
|
for (IndexCommit commit : commits) {
|
||||||
|
try (DirectoryReader reader = DirectoryReader.open(commit)) {
|
||||||
|
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||||
|
Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
|
||||||
|
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
|
||||||
|
greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static MapperService createMapperService(String type) throws IOException {
|
public static MapperService createMapperService(String type) throws IOException {
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
||||||
.settings(Settings.builder()
|
.settings(Settings.builder()
|
||||||
|
|
|
@ -68,6 +68,7 @@ public final class FollowingEngine extends InternalEngine {
|
||||||
@Override
|
@Override
|
||||||
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
|
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
|
||||||
preFlight(index);
|
preFlight(index);
|
||||||
|
markSeqNoAsSeen(index.seqNo());
|
||||||
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
|
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
|
||||||
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
|
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
|
||||||
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
|
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
|
||||||
|
@ -103,6 +104,7 @@ public final class FollowingEngine extends InternalEngine {
|
||||||
@Override
|
@Override
|
||||||
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
|
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
|
||||||
preFlight(delete);
|
preFlight(delete);
|
||||||
|
markSeqNoAsSeen(delete.seqNo());
|
||||||
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
|
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
|
||||||
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
|
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
|
||||||
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
|
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
|
||||||
|
|
|
@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.index.engine.EngineTestCase.getDocIds;
|
import static org.elasticsearch.index.engine.EngineTestCase.getDocIds;
|
||||||
|
import static org.elasticsearch.index.engine.EngineTestCase.getTranslog;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
@ -659,4 +660,49 @@ public class FollowingEngineTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMaxSeqNoInCommitUserData() throws Exception {
|
||||||
|
final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
|
||||||
|
.put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true)
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
|
||||||
|
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||||
|
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
|
||||||
|
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
|
||||||
|
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
|
||||||
|
try (FollowingEngine engine = createEngine(store, engineConfig)) {
|
||||||
|
AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
Thread rollTranslog = new Thread(() -> {
|
||||||
|
while (running.get() && getTranslog(engine).currentFileGeneration() < 500) {
|
||||||
|
engine.rollTranslogGeneration(); // make adding operations to translog slower
|
||||||
|
}
|
||||||
|
});
|
||||||
|
rollTranslog.start();
|
||||||
|
|
||||||
|
Thread indexing = new Thread(() -> {
|
||||||
|
List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, VersionType.EXTERNAL, 2, 50, 500, "id");
|
||||||
|
engine.advanceMaxSeqNoOfUpdatesOrDeletes(ops.stream().mapToLong(Engine.Operation::seqNo).max().getAsLong());
|
||||||
|
for (Engine.Operation op : ops) {
|
||||||
|
if (running.get() == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
EngineTestCase.applyOperation(engine, op);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
indexing.start();
|
||||||
|
|
||||||
|
int numCommits = between(5, 20);
|
||||||
|
for (int i = 0; i < numCommits; i++) {
|
||||||
|
engine.flush(false, true);
|
||||||
|
}
|
||||||
|
running.set(false);
|
||||||
|
indexing.join();
|
||||||
|
rollTranslog.join();
|
||||||
|
EngineTestCase.assertMaxSeqNoInCommitUserData(engine);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue