Changes to support retrieval of operations from translog based on specified range (#1257)
Backport changes to support retrieval of operations from translog based on specified range Signed-off-by: Sai Kumar <karanas@amazon.com>
This commit is contained in:
parent
eba365c5f7
commit
15e9f13762
|
@ -742,6 +742,15 @@ public abstract class Engine implements Closeable {
|
||||||
public abstract Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
|
public abstract Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
|
||||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;
|
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting
|
||||||
|
* seqno range (both inclusive).
|
||||||
|
*/
|
||||||
|
public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService,
|
||||||
|
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||||
|
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
|
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
|
||||||
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
||||||
|
|
|
@ -2702,6 +2702,17 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService,
|
||||||
|
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||||
|
if(historySource == HistorySource.INDEX) {
|
||||||
|
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
|
||||||
|
} else {
|
||||||
|
return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
|
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
|
||||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||||
|
|
|
@ -38,7 +38,7 @@ package org.opensearch.index.engine;
|
||||||
*/
|
*/
|
||||||
public final class MissingHistoryOperationsException extends IllegalStateException {
|
public final class MissingHistoryOperationsException extends IllegalStateException {
|
||||||
|
|
||||||
MissingHistoryOperationsException(String message) {
|
public MissingHistoryOperationsException(String message) {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2011,6 +2011,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
|
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Creates a new history snapshot for reading operations since
|
||||||
|
* the provided starting seqno (inclusive) and ending seqno (inclusive)
|
||||||
|
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
||||||
|
*/
|
||||||
|
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source,
|
||||||
|
long startingSeqNo, long endSeqNo) throws IOException {
|
||||||
|
return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
|
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
|
||||||
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
|
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.opensearch.core.internal.io.IOUtils;
|
||||||
import org.opensearch.index.IndexSettings;
|
import org.opensearch.index.IndexSettings;
|
||||||
import org.opensearch.index.VersionType;
|
import org.opensearch.index.VersionType;
|
||||||
import org.opensearch.index.engine.Engine;
|
import org.opensearch.index.engine.Engine;
|
||||||
|
import org.opensearch.index.engine.MissingHistoryOperationsException;
|
||||||
import org.opensearch.index.seqno.SequenceNumbers;
|
import org.opensearch.index.seqno.SequenceNumbers;
|
||||||
import org.opensearch.index.shard.AbstractIndexShardComponent;
|
import org.opensearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.opensearch.index.shard.IndexShardComponent;
|
import org.opensearch.index.shard.IndexShardComponent;
|
||||||
|
@ -617,6 +618,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
return newSnapshot(0, Long.MAX_VALUE);
|
return newSnapshot(0, Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
|
||||||
|
return newSnapshot(fromSeqNo, toSeqNo, false);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new translog snapshot containing operations from the given range.
|
* Creates a new translog snapshot containing operations from the given range.
|
||||||
*
|
*
|
||||||
|
@ -624,7 +629,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
* @param toSeqNo the upper bound of the range (inclusive)
|
* @param toSeqNo the upper bound of the range (inclusive)
|
||||||
* @return the new snapshot
|
* @return the new snapshot
|
||||||
*/
|
*/
|
||||||
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
|
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||||
assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo;
|
assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo;
|
||||||
assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo;
|
assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo;
|
||||||
try (ReleasableLock ignored = readLock.acquire()) {
|
try (ReleasableLock ignored = readLock.acquire()) {
|
||||||
|
@ -633,7 +638,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
.filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo())
|
.filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo())
|
||||||
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
|
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
|
||||||
final Snapshot snapshot = newMultiSnapshot(snapshots);
|
final Snapshot snapshot = newMultiSnapshot(snapshots);
|
||||||
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
|
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo, requiredFullRange);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -959,14 +964,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
private static final class SeqNoFilterSnapshot implements Snapshot {
|
private static final class SeqNoFilterSnapshot implements Snapshot {
|
||||||
private final Snapshot delegate;
|
private final Snapshot delegate;
|
||||||
private int filteredOpsCount;
|
private int filteredOpsCount;
|
||||||
|
private int opsCount;
|
||||||
|
private boolean requiredFullRange;
|
||||||
private final long fromSeqNo; // inclusive
|
private final long fromSeqNo; // inclusive
|
||||||
private final long toSeqNo; // inclusive
|
private final long toSeqNo; // inclusive
|
||||||
|
|
||||||
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
|
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
|
||||||
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
|
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
this.fromSeqNo = fromSeqNo;
|
this.fromSeqNo = fromSeqNo;
|
||||||
this.toSeqNo = toSeqNo;
|
this.toSeqNo = toSeqNo;
|
||||||
|
this.requiredFullRange = requiredFullRange;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -980,15 +988,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Operation next() throws IOException {
|
public Operation next() throws IOException, MissingHistoryOperationsException {
|
||||||
Translog.Operation op;
|
Translog.Operation op;
|
||||||
while ((op = delegate.next()) != null) {
|
while ((op = delegate.next()) != null) {
|
||||||
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
|
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
|
||||||
|
opsCount++;
|
||||||
return op;
|
return op;
|
||||||
} else {
|
} else {
|
||||||
filteredOpsCount++;
|
filteredOpsCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(requiredFullRange && (toSeqNo - fromSeqNo +1) != opsCount) {
|
||||||
|
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
|
||||||
|
"and to_seqno [" + toSeqNo + "] found");
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5417,6 +5417,48 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testHistoryBasedOnSource() throws Exception {
|
||||||
|
final List<Engine.Operation> operations = generateSingleDocHistory(false,
|
||||||
|
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
|
||||||
|
final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(
|
||||||
|
Lucene.SOFT_DELETES_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy());
|
||||||
|
Settings.Builder settings = Settings.builder()
|
||||||
|
.put(defaultSettings.getSettings())
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
|
||||||
|
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
|
||||||
|
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
|
||||||
|
Set<Long> expectedSeqNos = new HashSet<>();
|
||||||
|
try (Store store = createStore();
|
||||||
|
Engine engine = createEngine(config(indexSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) {
|
||||||
|
for (Engine.Operation op : operations) {
|
||||||
|
if (op instanceof Engine.Index) {
|
||||||
|
Engine.IndexResult indexResult = engine.index((Engine.Index) op);
|
||||||
|
assertThat(indexResult.getFailure(), nullValue());
|
||||||
|
expectedSeqNos.add(indexResult.getSeqNo());
|
||||||
|
} else {
|
||||||
|
Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op);
|
||||||
|
assertThat(deleteResult.getFailure(), nullValue());
|
||||||
|
expectedSeqNos.add(deleteResult.getSeqNo());
|
||||||
|
}
|
||||||
|
if (rarely()) {
|
||||||
|
engine.refresh("test");
|
||||||
|
}
|
||||||
|
if (rarely()) {
|
||||||
|
engine.flush();
|
||||||
|
}
|
||||||
|
if (rarely()) {
|
||||||
|
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MapperService mapperService = createMapperService("test");
|
||||||
|
List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService);
|
||||||
|
List<Translog.Operation> translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService);
|
||||||
|
assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
|
||||||
|
assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testLuceneHistoryOnPrimary() throws Exception {
|
public void testLuceneHistoryOnPrimary() throws Exception {
|
||||||
final List<Engine.Operation> operations = generateSingleDocHistory(false,
|
final List<Engine.Operation> operations = generateSingleDocHistory(false,
|
||||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
|
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
|
||||||
|
|
|
@ -75,6 +75,7 @@ import org.opensearch.index.IndexSettings;
|
||||||
import org.opensearch.index.VersionType;
|
import org.opensearch.index.VersionType;
|
||||||
import org.opensearch.index.engine.Engine;
|
import org.opensearch.index.engine.Engine;
|
||||||
import org.opensearch.index.engine.Engine.Operation.Origin;
|
import org.opensearch.index.engine.Engine.Operation.Origin;
|
||||||
|
import org.opensearch.index.engine.MissingHistoryOperationsException;
|
||||||
import org.opensearch.index.mapper.IdFieldMapper;
|
import org.opensearch.index.mapper.IdFieldMapper;
|
||||||
import org.opensearch.index.mapper.ParseContext.Document;
|
import org.opensearch.index.mapper.ParseContext.Document;
|
||||||
import org.opensearch.index.mapper.ParsedDocument;
|
import org.opensearch.index.mapper.ParsedDocument;
|
||||||
|
@ -120,6 +121,7 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
@ -737,6 +739,72 @@ public class TranslogTests extends OpenSearchTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Long populateTranslogOps(boolean withMissingOps) throws IOException {
|
||||||
|
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
|
||||||
|
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
|
||||||
|
final int generations = between(2, 20);
|
||||||
|
long currentSeqNo = 0L;
|
||||||
|
List<Translog.Operation> firstGenOps = null;
|
||||||
|
Map<Long, List<Translog.Operation>> operationsByGen = new HashMap<>();
|
||||||
|
for (int gen = 0; gen < generations; gen++) {
|
||||||
|
List<Long> seqNos = new ArrayList<>();
|
||||||
|
int numOps = randomIntBetween(4, 10);
|
||||||
|
for (int i = 0; i < numOps; i++, currentSeqNo++) {
|
||||||
|
minSeqNo = SequenceNumbers.min(minSeqNo, currentSeqNo);
|
||||||
|
maxSeqNo = SequenceNumbers.max(maxSeqNo, currentSeqNo);
|
||||||
|
seqNos.add(currentSeqNo);
|
||||||
|
}
|
||||||
|
Collections.shuffle(seqNos, new Random(100));
|
||||||
|
List<Translog.Operation> ops = new ArrayList<>(seqNos.size());
|
||||||
|
for (long seqNo : seqNos) {
|
||||||
|
Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()});
|
||||||
|
boolean shouldAdd = !withMissingOps || seqNo % 4 != 0;
|
||||||
|
if(shouldAdd) {
|
||||||
|
translog.add(op);
|
||||||
|
ops.add(op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
operationsByGen.put(translog.currentFileGeneration(), ops);
|
||||||
|
if(firstGenOps == null) {
|
||||||
|
firstGenOps = ops;
|
||||||
|
}
|
||||||
|
translog.rollGeneration();
|
||||||
|
if (rarely()) {
|
||||||
|
translog.rollGeneration(); // empty generation
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return currentSeqNo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFullRangeSnapshot() throws Exception {
|
||||||
|
// Successful snapshot
|
||||||
|
long nextSeqNo = populateTranslogOps(false);
|
||||||
|
long fromSeqNo = 0L;
|
||||||
|
long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15);
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) {
|
||||||
|
int totOps = 0;
|
||||||
|
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
|
||||||
|
totOps++;
|
||||||
|
}
|
||||||
|
assertEquals(totOps, toSeqNo - fromSeqNo + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFullRangeSnapshotWithFailures() throws Exception {
|
||||||
|
long nextSeqNo = populateTranslogOps(true);
|
||||||
|
long fromSeqNo = 0L;
|
||||||
|
long toSeqNo = Math.min(nextSeqNo-1, fromSeqNo + 15);
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) {
|
||||||
|
int totOps = 0;
|
||||||
|
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
|
||||||
|
totOps++;
|
||||||
|
}
|
||||||
|
fail("Should throw exception for missing operations");
|
||||||
|
} catch(MissingHistoryOperationsException e) {
|
||||||
|
assertTrue(e.getMessage().contains("Not all operations between from_seqno"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void assertFileIsPresent(Translog translog, long id) {
|
public void assertFileIsPresent(Translog translog, long id) {
|
||||||
if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) {
|
if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -1081,6 +1081,22 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
|
||||||
return operations;
|
return operations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source.
|
||||||
|
*/
|
||||||
|
public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine, Engine.HistorySource historySource,
|
||||||
|
MapperService mapper) throws IOException {
|
||||||
|
final List<Translog.Operation> operations = new ArrayList<>();
|
||||||
|
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper,
|
||||||
|
0, Long.MAX_VALUE, false)) {
|
||||||
|
Translog.Operation op;
|
||||||
|
while ((op = snapshot.next()) != null){
|
||||||
|
operations.add(op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return operations;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asserts the provided engine has a consistent document history between translog and Lucene index.
|
* Asserts the provided engine has a consistent document history between translog and Lucene index.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue