[Remove] types from Uid and remaining types/Uid from translog (#2450)

Removes types from UID class along with cleaning up all obsolete MapperService
dependendices. This includes removing UID from the Translog Delete operation
which is no longer needed due to type dependency removal.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
Nick Knize 2022-03-13 13:49:51 -04:00 committed by GitHub
parent 95d4750249
commit bdcaec5caf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 108 additions and 224 deletions

View File

@ -72,7 +72,6 @@ import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.Mapping;
import org.opensearch.index.mapper.ParseContext.Document;
import org.opensearch.index.mapper.ParsedDocument;
@ -736,13 +735,8 @@ public abstract class Engine implements Closeable {
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
*/
public abstract Translog.Snapshot newChangesSnapshot(
String source,
MapperService mapperService,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) throws IOException;
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException;
public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);

View File

@ -91,7 +91,6 @@ import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SeqNoFieldMapper;
@ -2773,20 +2772,13 @@ public class InternalEngine extends Engine {
}
@Override
public Translog.Snapshot newChangesSnapshot(
String source,
MapperService mapperService,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) throws IOException {
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher,
mapperService,
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,

View File

@ -36,7 +36,6 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
@ -51,11 +50,8 @@ import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.translog.Translog;
import java.io.Closeable;
@ -77,7 +73,6 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private final boolean requiredFullRange;
private final IndexSearcher indexSearcher;
private final MapperService mapperService;
private int docIndex = 0;
private final int totalHits;
private ScoreDoc[] scoreDocs;
@ -88,20 +83,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
*
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
* @param searchBatchSize the number of documents should be returned by each search
* @param fromSeqNo the min requesting seq# - inclusive
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(
Engine.Searcher engineSearcher,
MapperService mapperService,
int searchBatchSize,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) throws IOException {
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
@ -114,7 +102,6 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
IOUtils.close(engineSearcher);
}
};
this.mapperService = mapperService;
final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L);
this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize;
this.fromSeqNo = fromSeqNo;
@ -278,19 +265,17 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
: SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
leaf.reader().document(segmentDocID, fields);
fields.postProcess(mapperService);
final Translog.Operation op;
final boolean isTombstone = parallelArray.isTombStone[docIndex];
if (isTombstone && fields.uid() == null) {
if (isTombstone && fields.id() == null) {
op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString());
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]";
} else {
final String id = fields.uid().id();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
final String id = fields.id();
if (isTombstone) {
op = new Translog.Delete(id, uid, seqNo, primaryTerm, version);
op = new Translog.Delete(id, seqNo, primaryTerm, version);
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
} else {
final BytesReference source = fields.source();

View File

@ -46,7 +46,6 @@ import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
@ -326,13 +325,7 @@ public class ReadOnlyEngine extends Engine {
}
@Override
public Translog.Snapshot newChangesSnapshot(
String source,
MapperService mapperService,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) {
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
return newEmptySnapshot();
}

View File

@ -36,7 +36,6 @@ import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.IgnoredFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
@ -67,7 +66,7 @@ public class FieldsVisitor extends StoredFieldVisitor {
private final String sourceFieldName;
private final Set<String> requiredFields;
protected BytesReference source;
protected String type, id;
protected String id;
protected Map<String, List<Object>> fieldsValues;
public FieldsVisitor(boolean loadSource) {
@ -98,10 +97,6 @@ public class FieldsVisitor extends StoredFieldVisitor {
}
public void postProcess(MapperService mapperService) {
final DocumentMapper mapper = mapperService.documentMapper();
if (mapper != null) {
type = mapper.type();
}
for (Map.Entry<String, List<Object>> entry : fields().entrySet()) {
MappedFieldType fieldType = mapperService.fieldType(entry.getKey());
if (fieldType == null) {
@ -167,13 +162,8 @@ public class FieldsVisitor extends StoredFieldVisitor {
return source;
}
public Uid uid() {
if (id == null) {
return null;
} else if (type == null) {
throw new IllegalStateException("Call postProcess before getting the uid");
}
return new Uid(type, id);
public String id() {
return id;
}
public String routing() {
@ -195,7 +185,6 @@ public class FieldsVisitor extends StoredFieldVisitor {
public void reset() {
if (fieldsValues != null) fieldsValues.clear();
source = null;
type = null;
id = null;
requiredFields.addAll(BASE_REQUIRED_FIELDS);

View File

@ -43,52 +43,13 @@ public final class Uid {
public static final char DELIMITER = '#';
public static final byte DELIMITER_BYTE = 0x23;
private final String type;
private final String id;
public Uid(String type, String id) {
this.type = type;
this.id = id;
}
public String type() {
return type;
}
public String id() {
return id;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Uid uid = (Uid) o;
if (id != null ? !id.equals(uid.id) : uid.id != null) return false;
if (type != null ? !type.equals(uid.type) : uid.type != null) return false;
return true;
}
@Override
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (id != null ? id.hashCode() : 0);
return result;
}
@Override
public String toString() {
return type + "#" + id;
}
private static final int UTF8 = 0xff;
private static final int NUMERIC = 0xfe;
private static final int BASE64_ESCAPE = 0xfd;
// non-instantiable
private Uid() {}
static boolean isURLBase64WithoutPadding(String id) {
// We are not lenient about padding chars ('=') otherwise
// 'xxx=' and 'xxx' could be considered the same id

View File

@ -1069,14 +1069,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
+ getOperationPrimaryTerm()
+ "]";
ensureWriteAllowed(origin);
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
final Engine.Delete delete = prepareDelete(id, uid, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
return delete(engine, delete);
}
private Engine.Delete prepareDelete(
public static Engine.Delete prepareDelete(
String id,
Term uid,
long seqNo,
long primaryTerm,
long version,
@ -1086,6 +1084,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
long ifPrimaryTerm
) {
long startTime = System.nanoTime();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
return new Engine.Delete(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm);
}
@ -2238,7 +2237,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(reason, mapperService, startingSeqNo, endSeqNo, true);
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true);
}
/**
@ -2270,7 +2269,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
*/
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return getEngine().newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange);
}
public List<Segment> segments(boolean verbose) {

View File

@ -33,7 +33,6 @@
package org.opensearch.index.translog;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
@ -54,7 +53,9 @@ import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.MissingHistoryOperationsException;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShardComponent;
@ -1384,7 +1385,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public static final int SERIALIZATION_FORMAT = FORMAT_NO_DOC_TYPE;
private final String id;
private final Term uid;
private final long seqNo;
private final long primaryTerm;
private final long version;
@ -1397,7 +1397,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// Can't assert that this is _doc because pre 2.0 indexes can have any name for a type
}
id = in.readString();
uid = new Term(in.readString(), in.readBytesRef());
if (format < FORMAT_NO_DOC_TYPE) {
final String docType = in.readString();
assert docType.equals(IdFieldMapper.NAME) : docType + " != " + IdFieldMapper.NAME;
in.readBytesRef(); // uid
}
this.version = in.readLong();
if (format < FORMAT_NO_VERSION_TYPE) {
in.readByte(); // versionType
@ -1407,17 +1411,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
this(delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion());
this(delete.id(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion());
}
/** utility for testing */
public Delete(String id, long seqNo, long primaryTerm, Term uid) {
this(id, uid, seqNo, primaryTerm, Versions.MATCH_ANY);
public Delete(String id, long seqNo, long primaryTerm) {
this(id, seqNo, primaryTerm, Versions.MATCH_ANY);
}
public Delete(String id, Term uid, long seqNo, long primaryTerm, long version) {
public Delete(String id, long seqNo, long primaryTerm, long version) {
this.id = Objects.requireNonNull(id);
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
@ -1430,18 +1433,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public long estimateSize() {
return (id.length() * 2) + ((uid.field().length() * 2) + (uid.text().length()) * 2) + (3 * Long.BYTES); // seq_no, primary_term,
// and version;
return (id.length() * 2) + (3 * Long.BYTES); // seq_no, primary_term,
// and version;
}
public String id() {
return id;
}
public Term uid() {
return this.uid;
}
@Override
public long seqNo() {
return seqNo;
@ -1468,8 +1467,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
out.writeString(MapperService.SINGLE_MAPPING_NAME);
}
out.writeString(id);
out.writeString(uid.field());
out.writeBytesRef(uid.bytes());
if (format < FORMAT_NO_DOC_TYPE) {
out.writeString(IdFieldMapper.NAME);
out.writeBytesRef(Uid.encodeId(id));
}
out.writeLong(version);
if (format < FORMAT_NO_VERSION_TYPE) {
out.writeByte(VersionType.EXTERNAL.getValue());
@ -1489,13 +1490,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Delete delete = (Delete) o;
return version == delete.version && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm && uid.equals(delete.uid);
return version == delete.version && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm;
}
@Override
public int hashCode() {
int result = uid.hashCode();
result = 31 * result + Long.hashCode(seqNo);
int result = Long.hashCode(seqNo);
result = 31 * result + Long.hashCode(primaryTerm);
result = 31 * result + Long.hashCode(version);
return result;
@ -1503,7 +1503,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public String toString() {
return "Delete{" + "uid=" + uid + ", seqNo=" + seqNo + ", primaryTerm=" + primaryTerm + ", version=" + version + '}';
return "Delete{" + "seqNo=" + seqNo + ", primaryTerm=" + primaryTerm + ", version=" + version + '}';
}
}

View File

@ -60,7 +60,6 @@ import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.mapper.Uid;
import org.opensearch.search.SearchContextSourcePrinter;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
@ -335,14 +334,14 @@ public class FetchPhase {
} else {
SearchHit hit;
loadStoredFields(context.mapperService(), fieldReader, fieldsVisitor, subDocId);
Uid uid = fieldsVisitor.uid();
String id = fieldsVisitor.id();
if (fieldsVisitor.fields().isEmpty() == false) {
Map<String, DocumentField> docFields = new HashMap<>();
Map<String, DocumentField> metaFields = new HashMap<>();
fillDocAndMetaFields(context, fieldsVisitor, storedToRequestedFields, docFields, metaFields);
hit = new SearchHit(docId, uid.id(), docFields, metaFields);
hit = new SearchHit(docId, id, docFields, metaFields);
} else {
hit = new SearchHit(docId, uid.id(), emptyMap(), emptyMap());
hit = new SearchHit(docId, id, emptyMap(), emptyMap());
}
HitContext hitContext = new HitContext(hit, subReaderContext, subDocId, lookup.source());
@ -375,7 +374,7 @@ public class FetchPhase {
// because the entire _source is only stored with the root document.
boolean needSource = sourceRequired(context) || context.highlight() != null;
Uid rootId;
String rootId;
Map<String, Object> rootSourceAsMap = null;
XContentType rootSourceContentType = null;
@ -383,7 +382,7 @@ public class FetchPhase {
if (context instanceof InnerHitsContext.InnerHitSubContext) {
InnerHitsContext.InnerHitSubContext innerHitsContext = (InnerHitsContext.InnerHitSubContext) context;
rootId = innerHitsContext.getRootId();
rootId = innerHitsContext.getId();
if (needSource) {
SourceLookup rootLookup = innerHitsContext.getRootLookup();
@ -394,7 +393,7 @@ public class FetchPhase {
FieldsVisitor rootFieldsVisitor = new FieldsVisitor(needSource);
loadStoredFields(context.mapperService(), storedFieldReader, rootFieldsVisitor, rootDocId);
rootFieldsVisitor.postProcess(context.mapperService());
rootId = rootFieldsVisitor.uid();
rootId = rootFieldsVisitor.id();
if (needSource) {
if (rootFieldsVisitor.source() != null) {
@ -431,7 +430,7 @@ public class FetchPhase {
nestedObjectMapper
);
SearchHit hit = new SearchHit(nestedTopDocId, rootId.id(), nestedIdentity, docFields, metaFields);
SearchHit hit = new SearchHit(nestedTopDocId, rootId, nestedIdentity, docFields, metaFields);
HitContext hitContext = new HitContext(hit, subReaderContext, nestedDocId, new SourceLookup()); // Use a clean, fresh SourceLookup
// for the nested context

View File

@ -45,7 +45,6 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.index.mapper.Uid;
import org.opensearch.search.SearchHit;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.SubSearchContext;
@ -98,8 +97,7 @@ public final class InnerHitsContext {
private InnerHitsContext childInnerHits;
private Weight innerHitQueryWeight;
// TODO: when types are complete removed just use String instead for the id:
private Uid rootId;
private String id;
private SourceLookup rootLookup;
protected InnerHitSubContext(String name, SearchContext context) {
@ -141,12 +139,12 @@ public final class InnerHitsContext {
*
* Since this ID is available on the context, inner hits can avoid re-loading the root _id.
*/
public Uid getRootId() {
return rootId;
public String getId() {
return id;
}
public void setRootId(Uid rootId) {
this.rootId = rootId;
public void setId(String id) {
this.id = id;
}
/**

View File

@ -36,8 +36,6 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.Uid;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.fetch.FetchContext;
@ -96,7 +94,7 @@ public final class InnerHitsPhase implements FetchSubPhase {
docIdsToLoad[j] = topDoc.topDocs.scoreDocs[j].doc;
}
innerHitsContext.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
innerHitsContext.setRootId(new Uid(MapperService.SINGLE_MAPPING_NAME, hit.getId()));
innerHitsContext.setId(hit.getId());
innerHitsContext.setRootLookup(rootLookup);
fetchPhase.execute(innerHitsContext);

View File

@ -125,7 +125,6 @@ import org.opensearch.index.VersionType;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.mapper.ParseContext.Document;
import org.opensearch.index.mapper.ParsedDocument;
@ -1464,7 +1463,6 @@ public class InternalEngineTests extends EngineTestCase {
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final MapperService mapperService = createMapperService();
final Set<String> liveDocs = new HashSet<>();
try (
Store store = createStore();
@ -1502,8 +1500,8 @@ public class InternalEngineTests extends EngineTestCase {
safeCommitCheckpoint = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService).stream()
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine).stream()
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1);
@ -1530,8 +1528,8 @@ public class InternalEngineTests extends EngineTestCase {
engine.syncTranslog();
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
assertThat(readAllOperationsInLucene(engine), hasSize(liveDocs.size()));
}
}
@ -1543,7 +1541,6 @@ public class InternalEngineTests extends EngineTestCase {
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final MapperService mapperService = createMapperService();
final boolean omitSourceAllTheTime = randomBoolean();
final Set<String> liveDocs = new HashSet<>();
final Set<String> liveDocsWithSource = new HashSet<>();
@ -1595,8 +1592,8 @@ public class InternalEngineTests extends EngineTestCase {
minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1);
}
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService).stream()
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine).stream()
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= engine.getPersistedLocalCheckpoint(); seqno++) {
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
@ -1642,8 +1639,8 @@ public class InternalEngineTests extends EngineTestCase {
engine.syncTranslog();
}
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
assertThat(readAllOperationsInLucene(engine), hasSize(liveDocsWithSource.size()));
}
}
@ -3963,7 +3960,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(1, topDocs.totalHits.value);
}
if (engine.engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
List<Translog.Operation> ops = readAllOperationsInLucene(engine, createMapperService());
List<Translog.Operation> ops = readAllOperationsInLucene(engine);
assertThat(ops.stream().map(o -> o.seqNo()).collect(Collectors.toList()), hasItem(20L));
}
}
@ -4933,8 +4930,7 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get()));
assertThat(noOp.reason(), equalTo(reason));
if (engine.engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
MapperService mapperService = createMapperService();
List<Translog.Operation> operationsFromLucene = readAllOperationsInLucene(noOpEngine, mapperService);
List<Translog.Operation> operationsFromLucene = readAllOperationsInLucene(noOpEngine);
assertThat(operationsFromLucene, hasSize(maxSeqNo + 2 - localCheckpoint)); // fills n gap and 2 manual noop.
for (int i = 0; i < operationsFromLucene.size(); i++) {
assertThat(
@ -4942,7 +4938,7 @@ public class InternalEngineTests extends EngineTestCase {
equalTo(new Translog.NoOp(localCheckpoint + 1 + i, primaryTerm.get(), "filling gaps"))
);
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine, mapperService);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine);
}
} finally {
IOUtils.close(noOpEngine);
@ -5010,7 +5006,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
if (engine.engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
List<Translog.Operation> operations = readAllOperationsInLucene(engine, createMapperService());
List<Translog.Operation> operations = readAllOperationsInLucene(engine);
assertThat(operations, hasSize(numOps));
}
}
@ -5167,7 +5163,7 @@ public class InternalEngineTests extends EngineTestCase {
equalTo(0)
);
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
}
}
@ -6120,8 +6116,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
}
}
MapperService mapperService = createMapperService();
List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine, mapperService);
List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine);
assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
}
}
@ -6187,10 +6182,9 @@ public class InternalEngineTests extends EngineTestCase {
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
}
}
MapperService mapperService = createMapperService();
List<Translog.Operation> actualOps = readAllOperationsInLucene(engine, mapperService);
List<Translog.Operation> actualOps = readAllOperationsInLucene(engine);
assertThat(actualOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
}
}
@ -6277,9 +6271,7 @@ public class InternalEngineTests extends EngineTestCase {
long minRetainSeqNos = engine.getMinRetainedSeqNo();
assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1));
Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new);
Set<Long> actualOps = readAllOperationsInLucene(engine, createMapperService()).stream()
.map(Translog.Operation::seqNo)
.collect(Collectors.toSet());
Set<Long> actualOps = readAllOperationsInLucene(engine).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet());
assertThat(actualOps, containsInAnyOrder(expectedOps));
}
try (GatedCloseable<IndexCommit> wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
@ -6326,7 +6318,6 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception {
final MapperService mapperService = createMapperService();
final long maxSeqNo = randomLongBetween(10, 50);
final AtomicLong refreshCounter = new AtomicLong();
try (
@ -6371,7 +6362,7 @@ public class InternalEngineTests extends EngineTestCase {
@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", mapperService, min, max, true);
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true);
changes.close();
}
});

View File

@ -74,14 +74,14 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
long fromSeqNo = randomNonNegativeLong();
long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE);
// Empty engine
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(
error.getMessage(),
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")
);
}
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false)) {
assertThat(snapshot, SnapshotMatchers.size(0));
}
int numOps = between(1, 100);
@ -111,7 +111,6 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
try (
Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher,
mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
@ -128,7 +127,6 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
try (
Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher,
mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
@ -151,7 +149,6 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
try (
Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher,
mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
@ -167,7 +164,6 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
try (
Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher,
mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
@ -188,7 +184,6 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
try (
Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher,
mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
@ -204,7 +199,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
// Get snapshot via engine will auto refresh
fromSeqNo = randomLongBetween(0, numOps - 1);
toSeqNo = randomLongBetween(fromSeqNo, numOps - 1);
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean())) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
}
}
@ -235,7 +230,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
engine.refresh("test");
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, between(1, 100), 0, maxSeqNo, false)) {
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
@ -311,7 +306,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
long fromSeqNo = followerCheckpoint + 1;
long batchSize = randomLongBetween(0, 100);
long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint);
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) {
translogHandler.run(follower, snapshot);
}
}
@ -327,7 +322,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
.getProcessedCheckpoint()) {
pullOperations(engine);
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
// have to verify without source since we are randomly testing without _source
List<DocIdSeqNoAndSource> docsWithoutSourceOnFollower = getDocIds(engine, true).stream()
.map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion()))
@ -357,7 +352,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
public void testOverFlow() throws Exception {
long fromSeqNo = randomLongBetween(0, 5);
long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE);
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(
error.getMessage(),

View File

@ -364,7 +364,7 @@ public class TranslogTests extends OpenSearchTestCase {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}
addToTranslogAndList(translog, ops, new Translog.Delete("2", 1, primaryTerm.get(), newUid("2")));
addToTranslogAndList(translog, ops, new Translog.Delete("2", 1, primaryTerm.get()));
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
@ -383,7 +383,7 @@ public class TranslogTests extends OpenSearchTestCase {
Translog.Delete delete = (Translog.Delete) snapshot.next();
assertNotNull(delete);
assertThat(delete.uid(), equalTo(newUid("2")));
assertThat(delete.id(), equalTo("2"));
Translog.NoOp noOp = (Translog.NoOp) snapshot.next();
assertNotNull(noOp);
@ -465,23 +465,23 @@ public class TranslogTests extends OpenSearchTestCase {
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
translog.add(new Translog.Delete("2", 1, primaryTerm.get(), newUid("2")));
translog.add(new Translog.Delete("2", 1, primaryTerm.get()));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(2));
assertThat(stats.getTranslogSizeInBytes(), equalTo(200L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(193L));
assertThat(stats.getUncommittedOperations(), equalTo(2));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(145L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(138L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
translog.add(new Translog.Delete("3", 2, primaryTerm.get(), newUid("3")));
translog.add(new Translog.Delete("3", 2, primaryTerm.get()));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(3));
assertThat(stats.getTranslogSizeInBytes(), equalTo(243L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(229L));
assertThat(stats.getUncommittedOperations(), equalTo(3));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(188L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(174L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
@ -489,9 +489,9 @@ public class TranslogTests extends OpenSearchTestCase {
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(285L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(271L));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(230L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(216L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
@ -499,9 +499,9 @@ public class TranslogTests extends OpenSearchTestCase {
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(340L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(326L));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(285L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(271L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
@ -511,7 +511,7 @@ public class TranslogTests extends OpenSearchTestCase {
stats.writeTo(out);
final TranslogStats copy = new TranslogStats(out.bytes().streamInput());
assertThat(copy.estimatedNumberOfOperations(), equalTo(4));
assertThat(copy.getTranslogSizeInBytes(), equalTo(340L));
assertThat(copy.getTranslogSizeInBytes(), equalTo(326L));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject();
@ -521,9 +521,9 @@ public class TranslogTests extends OpenSearchTestCase {
Strings.toString(builder),
equalTo(
"{\"translog\":{\"operations\":4,\"size_in_bytes\":"
+ 340
+ 326
+ ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":"
+ 285
+ 271
+ ",\"earliest_last_modified_age\":"
+ stats.getEarliestLastModifiedAge()
+ "}}"
@ -537,7 +537,7 @@ public class TranslogTests extends OpenSearchTestCase {
long lastModifiedAge = System.currentTimeMillis() - translog.getCurrent().getLastModifiedTime();
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(340L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(326L));
assertThat(stats.getUncommittedOperations(), equalTo(0));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition));
assertThat(stats.getEarliestLastModifiedAge(), greaterThanOrEqualTo(lastModifiedAge));
@ -922,7 +922,7 @@ public class TranslogTests extends OpenSearchTestCase {
case DELETE:
Translog.Delete delOp = (Translog.Delete) op;
Translog.Delete expDelOp = (Translog.Delete) expectedOp;
assertEquals(expDelOp.uid(), delOp.uid());
assertEquals(expDelOp.id(), delOp.id());
assertEquals(expDelOp.version(), delOp.version());
break;
case NO_OP:
@ -1076,7 +1076,7 @@ public class TranslogTests extends OpenSearchTestCase {
op = new Translog.Index("" + id, id, primaryTerm.get(), new byte[] { (byte) id });
break;
case DELETE:
op = new Translog.Delete(Long.toString(id), id, primaryTerm.get(), newUid(Long.toString(id)));
op = new Translog.Delete(Long.toString(id), id, primaryTerm.get());
break;
case NO_OP:
op = new Translog.NoOp(id, 1, Long.toString(id));
@ -2414,7 +2414,6 @@ public class TranslogTests extends OpenSearchTestCase {
case DELETE:
op = new Translog.Delete(
threadId + "_" + opCount,
new Term("_uid", threadId + "_" + opCount),
seqNoGenerator.getAndIncrement(),
primaryTerm.get(),
1 + randomInt(100000)

View File

@ -1188,13 +1188,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
if (randomBoolean()) {
op = new Translog.Index("id", seqNo, randomNonNegativeLong(), randomNonNegativeLong(), source, null, -1);
} else if (randomBoolean()) {
op = new Translog.Delete(
"id",
new Term("_id", Uid.encodeId("id")),
seqNo,
randomNonNegativeLong(),
randomNonNegativeLong()
);
op = new Translog.Delete("id", seqNo, randomNonNegativeLong(), randomNonNegativeLong());
} else {
op = new Translog.NoOp(seqNo, randomNonNegativeLong(), "test");
}

View File

@ -329,14 +329,14 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
try {
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
assertNoInFlightDocuments(engine);
assertMaxSeqNoInCommitUserData(engine);
assertAtMostOneLuceneDocumentPerSequenceNumber(engine);
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine);
assertNoInFlightDocuments(replicaEngine);
assertMaxSeqNoInCommitUserData(replicaEngine);
assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine);
@ -1310,9 +1310,9 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
* Reads all engine operations that have been processed by the engine from Lucene index.
* The returned operations are sorted and de-duplicated, thus each sequence number will be have at most one operation.
*/
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException {
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
@ -1324,9 +1324,9 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
/**
* Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source.
*/
public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine, MapperService mapper) throws IOException {
public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
@ -1338,8 +1338,8 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
/**
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
if (mapper == null || mapper.documentMapper() == null || (engine instanceof InternalEngine) == false) {
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine) throws IOException {
if (engine instanceof InternalEngine == false) {
return;
}
final List<Translog.Operation> translogOps = new ArrayList<>();
@ -1349,7 +1349,7 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
translogOps.add(op);
}
}
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine).stream()
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
for (Translog.Operation op : translogOps) {

View File

@ -147,19 +147,16 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
return engineIndex;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
final Engine.Delete engineDelete = new Engine.Delete(
return IndexShard.prepareDelete(
delete.id(),
delete.uid(),
delete.seqNo(),
delete.primaryTerm(),
delete.version(),
versionType,
origin,
System.nanoTime(),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);
return engineDelete;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
final Engine.NoOp engineNoOp = new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());

View File

@ -868,7 +868,7 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
}
final Engine engine = shard.getEngineOrNull();
if (engine != null) {
EngineTestCase.assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, shard.mapperService());
EngineTestCase.assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
}
}