Changes to support retrieval of operations from translog based on specified range (#1257)
Backport changes to support retrieval of operations from translog based on specified range Signed-off-by: Sai Kumar <karanas@amazon.com>
This commit is contained in:
parent
eba365c5f7
commit
15e9f13762
|
@ -742,6 +742,15 @@ public abstract class Engine implements Closeable {
|
|||
public abstract Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
|
||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting
|
||||
* seqno range (both inclusive).
|
||||
*/
|
||||
public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService,
|
||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
|
||||
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
||||
|
|
|
@ -2702,6 +2702,17 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService,
|
||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||
if(historySource == HistorySource.INDEX) {
|
||||
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
|
||||
} else {
|
||||
return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
|
||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||
|
|
|
@ -38,7 +38,7 @@ package org.opensearch.index.engine;
|
|||
*/
|
||||
public final class MissingHistoryOperationsException extends IllegalStateException {
|
||||
|
||||
MissingHistoryOperationsException(String message) {
|
||||
public MissingHistoryOperationsException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2011,6 +2011,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Creates a new history snapshot for reading operations since
|
||||
* the provided starting seqno (inclusive) and ending seqno (inclusive)
|
||||
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
||||
*/
|
||||
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source,
|
||||
long startingSeqNo, long endSeqNo) throws IOException {
|
||||
return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
|
||||
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.opensearch.core.internal.io.IOUtils;
|
|||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.VersionType;
|
||||
import org.opensearch.index.engine.Engine;
|
||||
import org.opensearch.index.engine.MissingHistoryOperationsException;
|
||||
import org.opensearch.index.seqno.SequenceNumbers;
|
||||
import org.opensearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.opensearch.index.shard.IndexShardComponent;
|
||||
|
@ -617,6 +618,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return newSnapshot(0, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
|
||||
return newSnapshot(fromSeqNo, toSeqNo, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new translog snapshot containing operations from the given range.
|
||||
*
|
||||
|
@ -624,7 +629,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
* @param toSeqNo the upper bound of the range (inclusive)
|
||||
* @return the new snapshot
|
||||
*/
|
||||
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
|
||||
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||
assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo;
|
||||
assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo;
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
|
@ -633,7 +638,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
.filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo())
|
||||
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
|
||||
final Snapshot snapshot = newMultiSnapshot(snapshots);
|
||||
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
|
||||
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo, requiredFullRange);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -959,14 +964,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
private static final class SeqNoFilterSnapshot implements Snapshot {
|
||||
private final Snapshot delegate;
|
||||
private int filteredOpsCount;
|
||||
private int opsCount;
|
||||
private boolean requiredFullRange;
|
||||
private final long fromSeqNo; // inclusive
|
||||
private final long toSeqNo; // inclusive
|
||||
|
||||
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
|
||||
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
|
||||
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
|
||||
this.delegate = delegate;
|
||||
this.fromSeqNo = fromSeqNo;
|
||||
this.toSeqNo = toSeqNo;
|
||||
this.requiredFullRange = requiredFullRange;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -980,15 +988,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
|
||||
@Override
|
||||
public Operation next() throws IOException {
|
||||
public Operation next() throws IOException, MissingHistoryOperationsException {
|
||||
Translog.Operation op;
|
||||
while ((op = delegate.next()) != null) {
|
||||
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
|
||||
opsCount++;
|
||||
return op;
|
||||
} else {
|
||||
filteredOpsCount++;
|
||||
}
|
||||
}
|
||||
if(requiredFullRange && (toSeqNo - fromSeqNo +1) != opsCount) {
|
||||
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
|
||||
"and to_seqno [" + toSeqNo + "] found");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -5417,6 +5417,48 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testHistoryBasedOnSource() throws Exception {
|
||||
final List<Engine.Operation> operations = generateSingleDocHistory(false,
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
|
||||
final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(
|
||||
Lucene.SOFT_DELETES_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy());
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put(defaultSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
|
||||
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
|
||||
Set<Long> expectedSeqNos = new HashSet<>();
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(config(indexSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) {
|
||||
for (Engine.Operation op : operations) {
|
||||
if (op instanceof Engine.Index) {
|
||||
Engine.IndexResult indexResult = engine.index((Engine.Index) op);
|
||||
assertThat(indexResult.getFailure(), nullValue());
|
||||
expectedSeqNos.add(indexResult.getSeqNo());
|
||||
} else {
|
||||
Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op);
|
||||
assertThat(deleteResult.getFailure(), nullValue());
|
||||
expectedSeqNos.add(deleteResult.getSeqNo());
|
||||
}
|
||||
if (rarely()) {
|
||||
engine.refresh("test");
|
||||
}
|
||||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
if (rarely()) {
|
||||
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
|
||||
}
|
||||
}
|
||||
MapperService mapperService = createMapperService("test");
|
||||
List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService);
|
||||
List<Translog.Operation> translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService);
|
||||
assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
|
||||
assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testLuceneHistoryOnPrimary() throws Exception {
|
||||
final List<Engine.Operation> operations = generateSingleDocHistory(false,
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
|
||||
|
|
|
@ -75,6 +75,7 @@ import org.opensearch.index.IndexSettings;
|
|||
import org.opensearch.index.VersionType;
|
||||
import org.opensearch.index.engine.Engine;
|
||||
import org.opensearch.index.engine.Engine.Operation.Origin;
|
||||
import org.opensearch.index.engine.MissingHistoryOperationsException;
|
||||
import org.opensearch.index.mapper.IdFieldMapper;
|
||||
import org.opensearch.index.mapper.ParseContext.Document;
|
||||
import org.opensearch.index.mapper.ParsedDocument;
|
||||
|
@ -120,6 +121,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -737,6 +739,72 @@ public class TranslogTests extends OpenSearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private Long populateTranslogOps(boolean withMissingOps) throws IOException {
|
||||
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
|
||||
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
|
||||
final int generations = between(2, 20);
|
||||
long currentSeqNo = 0L;
|
||||
List<Translog.Operation> firstGenOps = null;
|
||||
Map<Long, List<Translog.Operation>> operationsByGen = new HashMap<>();
|
||||
for (int gen = 0; gen < generations; gen++) {
|
||||
List<Long> seqNos = new ArrayList<>();
|
||||
int numOps = randomIntBetween(4, 10);
|
||||
for (int i = 0; i < numOps; i++, currentSeqNo++) {
|
||||
minSeqNo = SequenceNumbers.min(minSeqNo, currentSeqNo);
|
||||
maxSeqNo = SequenceNumbers.max(maxSeqNo, currentSeqNo);
|
||||
seqNos.add(currentSeqNo);
|
||||
}
|
||||
Collections.shuffle(seqNos, new Random(100));
|
||||
List<Translog.Operation> ops = new ArrayList<>(seqNos.size());
|
||||
for (long seqNo : seqNos) {
|
||||
Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()});
|
||||
boolean shouldAdd = !withMissingOps || seqNo % 4 != 0;
|
||||
if(shouldAdd) {
|
||||
translog.add(op);
|
||||
ops.add(op);
|
||||
}
|
||||
}
|
||||
operationsByGen.put(translog.currentFileGeneration(), ops);
|
||||
if(firstGenOps == null) {
|
||||
firstGenOps = ops;
|
||||
}
|
||||
translog.rollGeneration();
|
||||
if (rarely()) {
|
||||
translog.rollGeneration(); // empty generation
|
||||
}
|
||||
}
|
||||
return currentSeqNo;
|
||||
}
|
||||
|
||||
public void testFullRangeSnapshot() throws Exception {
|
||||
// Successful snapshot
|
||||
long nextSeqNo = populateTranslogOps(false);
|
||||
long fromSeqNo = 0L;
|
||||
long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15);
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) {
|
||||
int totOps = 0;
|
||||
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
|
||||
totOps++;
|
||||
}
|
||||
assertEquals(totOps, toSeqNo - fromSeqNo + 1);
|
||||
}
|
||||
}
|
||||
|
||||
public void testFullRangeSnapshotWithFailures() throws Exception {
|
||||
long nextSeqNo = populateTranslogOps(true);
|
||||
long fromSeqNo = 0L;
|
||||
long toSeqNo = Math.min(nextSeqNo-1, fromSeqNo + 15);
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) {
|
||||
int totOps = 0;
|
||||
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
|
||||
totOps++;
|
||||
}
|
||||
fail("Should throw exception for missing operations");
|
||||
} catch(MissingHistoryOperationsException e) {
|
||||
assertTrue(e.getMessage().contains("Not all operations between from_seqno"));
|
||||
}
|
||||
}
|
||||
|
||||
public void assertFileIsPresent(Translog translog, long id) {
|
||||
if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) {
|
||||
return;
|
||||
|
|
|
@ -1081,6 +1081,22 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
|
|||
return operations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source.
|
||||
*/
|
||||
public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine, Engine.HistorySource historySource,
|
||||
MapperService mapper) throws IOException {
|
||||
final List<Translog.Operation> operations = new ArrayList<>();
|
||||
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper,
|
||||
0, Long.MAX_VALUE, false)) {
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null){
|
||||
operations.add(op);
|
||||
}
|
||||
}
|
||||
return operations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the provided engine has a consistent document history between translog and Lucene index.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue