[CCR] Read changes from Lucene instead of translog (#30120)
This commit adds an API to read translog snapshot from Lucene, then cut-over from the existing translog to the new API in CCR. Relates #30086 Relates #29530
This commit is contained in:
parent
5d99157236
commit
bb6586dc5f
|
@ -58,6 +58,7 @@ import org.elasticsearch.common.metrics.CounterMetric;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
|
@ -609,6 +610,12 @@ public abstract class Engine implements Closeable {
|
|||
return getTranslog().getLastWriteLocation();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new "translog" snapshot from Lucene for reading operations whose seqno in the requesting seqno range
|
||||
*/
|
||||
public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
|
||||
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException;
|
||||
|
||||
protected final void ensureOpen(Exception suppressed) {
|
||||
if (isClosed.get()) {
|
||||
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
|
@ -148,6 +149,7 @@ public class InternalEngine extends Engine {
|
|||
private final CounterMetric numDocUpdates = new CounterMetric();
|
||||
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
|
||||
private final boolean softDeleteEnabled;
|
||||
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
|
||||
|
||||
/**
|
||||
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
|
||||
|
@ -224,6 +226,8 @@ public class InternalEngine extends Engine {
|
|||
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
|
||||
this.internalSearcherManager.addListener(listener);
|
||||
}
|
||||
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
|
||||
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
|
@ -2345,6 +2349,23 @@ public class InternalEngine extends Engine {
|
|||
return numDocUpdates.count();
|
||||
}
|
||||
|
||||
public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
|
||||
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
|
||||
// TODO: Should we defer the refresh until we really need it?
|
||||
ensureOpen();
|
||||
if (lastRefreshedCheckpoint() < maxSeqNo) {
|
||||
refresh(source, SearcherScope.INTERNAL);
|
||||
}
|
||||
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
|
||||
try {
|
||||
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
|
||||
searcher = null;
|
||||
return snapshot;
|
||||
} finally {
|
||||
IOUtils.close(searcher);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecovering() {
|
||||
return pendingTranslogRecovery.get();
|
||||
|
@ -2391,4 +2412,28 @@ public class InternalEngine extends Engine {
|
|||
return super.softUpdateDocuments(term, docs, softDeletes);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returned the last local checkpoint value has been refreshed internally.
|
||||
*/
|
||||
final long lastRefreshedCheckpoint() {
|
||||
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
|
||||
}
|
||||
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
|
||||
final AtomicLong refreshedCheckpoint;
|
||||
private long pendingCheckpoint;
|
||||
LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
|
||||
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
|
||||
}
|
||||
@Override
|
||||
public void beforeRefresh() {
|
||||
pendingCheckpoint = localCheckpointTracker.getCheckpoint(); // All change until this point should be visible after refresh
|
||||
}
|
||||
@Override
|
||||
public void afterRefresh(boolean didRefresh) {
|
||||
if (didRefresh) {
|
||||
refreshedCheckpoint.set(pendingCheckpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,280 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.document.LongPoint;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.apache.lucene.index.ReaderUtil;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.apache.lucene.search.SortedNumericSortField;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A {@link Translog.Snapshot} from changes in a Lucene index
|
||||
*/
|
||||
final class LuceneChangesSnapshot implements Translog.Snapshot {
|
||||
private final long fromSeqNo, toSeqNo;
|
||||
private long lastSeenSeqNo;
|
||||
private int skippedOperations;
|
||||
private final boolean requiredFullRange;
|
||||
|
||||
private final IndexSearcher indexSearcher;
|
||||
private final MapperService mapperService;
|
||||
private int docIndex = 0;
|
||||
private final TopDocs topDocs;
|
||||
|
||||
private final Closeable onClose;
|
||||
private final CombinedDocValues[] docValues; // Cache of DocValues
|
||||
|
||||
/**
|
||||
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
|
||||
*
|
||||
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
|
||||
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
|
||||
* @param fromSeqNo the min requesting seq# - inclusive
|
||||
* @param toSeqNo the maximum requesting seq# - inclusive
|
||||
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
|
||||
*/
|
||||
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService,
|
||||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
|
||||
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
|
||||
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
|
||||
}
|
||||
this.mapperService = mapperService;
|
||||
this.fromSeqNo = fromSeqNo;
|
||||
this.toSeqNo = toSeqNo;
|
||||
this.lastSeenSeqNo = fromSeqNo - 1;
|
||||
this.requiredFullRange = requiredFullRange;
|
||||
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
|
||||
this.indexSearcher.setQueryCache(null);
|
||||
this.topDocs = searchOperations(indexSearcher);
|
||||
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
|
||||
this.docValues = new CombinedDocValues[leaves.size()];
|
||||
for (LeafReaderContext leaf : leaves) {
|
||||
this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader());
|
||||
}
|
||||
this.onClose = engineSearcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
onClose.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return Math.toIntExact(topDocs.totalHits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int overriddenOperations() {
|
||||
return skippedOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Operation next() throws IOException {
|
||||
Translog.Operation op = null;
|
||||
for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) {
|
||||
op = readDocAsOp(docId);
|
||||
if (op != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (requiredFullRange) {
|
||||
rangeCheck(op);
|
||||
}
|
||||
if (op != null) {
|
||||
lastSeenSeqNo = op.seqNo();
|
||||
}
|
||||
return op;
|
||||
}
|
||||
|
||||
private void rangeCheck(Translog.Operation op) {
|
||||
if (op == null) {
|
||||
if (lastSeenSeqNo < toSeqNo) {
|
||||
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
|
||||
"and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
|
||||
}
|
||||
} else {
|
||||
final long expectedSeqNo = lastSeenSeqNo + 1;
|
||||
if (op.seqNo() != expectedSeqNo) {
|
||||
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
|
||||
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int nextDocId() {
|
||||
if (docIndex < topDocs.scoreDocs.length) {
|
||||
final int docId = topDocs.scoreDocs[docIndex].doc;
|
||||
docIndex++;
|
||||
return docId;
|
||||
} else {
|
||||
return DocIdSetIterator.NO_MORE_DOCS;
|
||||
}
|
||||
}
|
||||
|
||||
private TopDocs searchOperations(IndexSearcher searcher) throws IOException {
|
||||
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
|
||||
final Sort sortedBySeqNoThenByTerm = new Sort(
|
||||
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
|
||||
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
|
||||
);
|
||||
return searcher.search(rangeQuery, Integer.MAX_VALUE, sortedBySeqNoThenByTerm);
|
||||
}
|
||||
|
||||
private Translog.Operation readDocAsOp(int docID) throws IOException {
|
||||
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
|
||||
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
|
||||
final int segmentDocID = docID - leaf.docBase;
|
||||
final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID);
|
||||
// We don't have to read the nested child documents - those docs don't have primary terms.
|
||||
if (primaryTerm == -1) {
|
||||
skippedOperations++;
|
||||
return null;
|
||||
}
|
||||
final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID);
|
||||
// Only pick the first seen seq#
|
||||
if (seqNo == lastSeenSeqNo) {
|
||||
skippedOperations++;
|
||||
return null;
|
||||
}
|
||||
final long version = docValues[leaf.ord].docVersion(segmentDocID);
|
||||
final FieldsVisitor fields = new FieldsVisitor(true);
|
||||
indexSearcher.doc(docID, fields);
|
||||
fields.postProcess(mapperService);
|
||||
|
||||
final Translog.Operation op;
|
||||
final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID);
|
||||
if (isTombstone && fields.uid() == null) {
|
||||
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields?
|
||||
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
|
||||
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]";
|
||||
} else {
|
||||
final String id = fields.uid().id();
|
||||
final String type = fields.uid().type();
|
||||
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
|
||||
if (isTombstone) {
|
||||
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL);
|
||||
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
|
||||
} else {
|
||||
final BytesReference source = fields.source();
|
||||
// TODO: pass the latest timestamp from engine.
|
||||
final long autoGeneratedIdTimestamp = -1;
|
||||
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL,
|
||||
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
|
||||
}
|
||||
}
|
||||
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
|
||||
"last_seen_seqno [" + lastSeenSeqNo + "], from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "], op [" + op + "]";
|
||||
return op;
|
||||
}
|
||||
|
||||
private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException {
|
||||
final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETE_FIELD);
|
||||
if (ndv == null || ndv.advanceExact(segmentDocId) == false) {
|
||||
throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETE_FIELD + "] is not found");
|
||||
}
|
||||
return ndv.longValue() == 1;
|
||||
}
|
||||
|
||||
private static final class CombinedDocValues {
|
||||
private final LeafReader leafReader;
|
||||
private NumericDocValues versionDV;
|
||||
private NumericDocValues seqNoDV;
|
||||
private NumericDocValues primaryTermDV;
|
||||
private NumericDocValues tombstoneDV;
|
||||
|
||||
CombinedDocValues(LeafReader leafReader) throws IOException {
|
||||
this.leafReader = leafReader;
|
||||
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
|
||||
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
|
||||
this.primaryTermDV = Objects.requireNonNull(
|
||||
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
|
||||
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
|
||||
}
|
||||
|
||||
long docVersion(int segmentDocId) throws IOException {
|
||||
if (versionDV.docID() > segmentDocId) {
|
||||
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
|
||||
}
|
||||
if (versionDV.advanceExact(segmentDocId) == false) {
|
||||
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
|
||||
}
|
||||
return versionDV.longValue();
|
||||
}
|
||||
|
||||
long docSeqNo(int segmentDocId) throws IOException {
|
||||
if (seqNoDV.docID() > segmentDocId) {
|
||||
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
|
||||
}
|
||||
if (seqNoDV.advanceExact(segmentDocId) == false) {
|
||||
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
|
||||
}
|
||||
return seqNoDV.longValue();
|
||||
}
|
||||
|
||||
long docPrimaryTerm(int segmentDocId) throws IOException {
|
||||
if (primaryTermDV == null) {
|
||||
return -1L;
|
||||
}
|
||||
if (primaryTermDV.docID() > segmentDocId) {
|
||||
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
}
|
||||
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
|
||||
if (primaryTermDV.advanceExact(segmentDocId) == false) {
|
||||
return -1;
|
||||
}
|
||||
return primaryTermDV.longValue();
|
||||
}
|
||||
|
||||
boolean isTombstone(int segmentDocId) throws IOException {
|
||||
if (tombstoneDV == null) {
|
||||
return false;
|
||||
}
|
||||
if (tombstoneDV.docID() > segmentDocId) {
|
||||
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
|
||||
}
|
||||
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1595,11 +1595,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* The caller has to close the returned snapshot after finishing the reading.
|
||||
*/
|
||||
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
|
||||
return newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
|
||||
return getEngine().newTranslogSnapshotBetween(minSeqNo, maxSeqNo);
|
||||
return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1609,6 +1605,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new "translog" snapshot from Lucene for reading operations whose seqno is between minSeqNo and maxSeqNo.
|
||||
* The caller has to close the returned snapshot after finishing the reading.
|
||||
*
|
||||
* @param source the source of the request
|
||||
* @param minSeqNo the min_seqno to read - inclusive
|
||||
* @param maxSeqNo the max_seqno to read - inclusive
|
||||
* @param requiredFullRange if true then {@link Translog.Snapshot#next()} will throw {@link IllegalStateException}
|
||||
* if any operation between minSeqNo and maxSeqNo is missing. This parameter should be only
|
||||
* enabled when the requesting range is below the global checkpoint.
|
||||
*/
|
||||
public Translog.Snapshot newLuceneChangesSnapshot(String source, long minSeqNo, long maxSeqNo,
|
||||
boolean requiredFullRange) throws IOException {
|
||||
return getEngine().newLuceneChangesSnapshot(source, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
|
||||
}
|
||||
|
||||
public List<Segment> segments(boolean verbose) {
|
||||
return getEngine().segments(verbose);
|
||||
}
|
||||
|
|
|
@ -1545,42 +1545,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertVisibleCount(engine, totalExpectedOps);
|
||||
}
|
||||
|
||||
private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
|
||||
Thread[] thread = new Thread[randomIntBetween(3, 5)];
|
||||
CountDownLatch startGun = new CountDownLatch(thread.length);
|
||||
AtomicInteger offset = new AtomicInteger(-1);
|
||||
for (int i = 0; i < thread.length; i++) {
|
||||
thread[i] = new Thread(() -> {
|
||||
startGun.countDown();
|
||||
try {
|
||||
startGun.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
int docOffset;
|
||||
while ((docOffset = offset.incrementAndGet()) < ops.size()) {
|
||||
try {
|
||||
final Engine.Operation op = ops.get(docOffset);
|
||||
if (op instanceof Engine.Index) {
|
||||
engine.index((Engine.Index) op);
|
||||
} else {
|
||||
engine.delete((Engine.Delete) op);
|
||||
}
|
||||
if ((docOffset + 1) % 4 == 0) {
|
||||
engine.refresh("test");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
thread[i].start();
|
||||
}
|
||||
for (int i = 0; i < thread.length; i++) {
|
||||
thread[i].join();
|
||||
}
|
||||
}
|
||||
|
||||
public void testInternalVersioningOnPrimary() throws IOException {
|
||||
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1");
|
||||
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.SnapshotMatchers;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class LuceneChangesSnapshotTests extends EngineTestCase {
|
||||
private MapperService mapperService;
|
||||
|
||||
@Before
|
||||
public void createMapper() throws Exception {
|
||||
mapperService = createMapperService("test");
|
||||
}
|
||||
|
||||
public void testBasics() throws Exception {
|
||||
long fromSeqNo = randomNonNegativeLong();
|
||||
long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE);
|
||||
// Empty engine
|
||||
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
|
||||
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
|
||||
assertThat(error.getMessage(),
|
||||
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
|
||||
}
|
||||
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) {
|
||||
assertThat(snapshot, SnapshotMatchers.size(0));
|
||||
}
|
||||
int numOps = between(1, 100);
|
||||
int refreshedSeqNo = -1;
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
String id = Integer.toString(randomIntBetween(i, i + 5));
|
||||
ParsedDocument doc = createParsedDoc(id, null);
|
||||
if (randomBoolean()) {
|
||||
engine.index(indexForDoc(doc));
|
||||
} else {
|
||||
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
|
||||
}
|
||||
if (rarely()) {
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
} else {
|
||||
engine.refresh("test");
|
||||
}
|
||||
refreshedSeqNo = i;
|
||||
}
|
||||
}
|
||||
if (refreshedSeqNo == -1) {
|
||||
fromSeqNo = between(0, numOps);
|
||||
toSeqNo = randomLongBetween(fromSeqNo, numOps * 2);
|
||||
|
||||
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) {
|
||||
searcher = null;
|
||||
assertThat(snapshot, SnapshotMatchers.size(0));
|
||||
} finally {
|
||||
IOUtils.close(searcher);
|
||||
}
|
||||
|
||||
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
|
||||
searcher = null;
|
||||
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
|
||||
assertThat(error.getMessage(),
|
||||
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
|
||||
}finally {
|
||||
IOUtils.close(searcher);
|
||||
}
|
||||
}else {
|
||||
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
|
||||
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
|
||||
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) {
|
||||
searcher = null;
|
||||
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
|
||||
}finally {
|
||||
IOUtils.close(searcher);
|
||||
}
|
||||
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
|
||||
searcher = null;
|
||||
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
|
||||
assertThat(error.getMessage(),
|
||||
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
|
||||
}finally {
|
||||
IOUtils.close(searcher);
|
||||
}
|
||||
toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo);
|
||||
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
|
||||
searcher = null;
|
||||
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
|
||||
}finally {
|
||||
IOUtils.close(searcher);
|
||||
}
|
||||
}
|
||||
// Get snapshot via engine will auto refresh
|
||||
fromSeqNo = randomLongBetween(0, numOps - 1);
|
||||
toSeqNo = randomLongBetween(fromSeqNo, numOps - 1);
|
||||
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean())) {
|
||||
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDedupByPrimaryTerm() throws Exception {
|
||||
Map<Long, Long> latestOperations = new HashMap<>();
|
||||
List<Integer> terms = Arrays.asList(between(1, 1000), between(1000, 2000));
|
||||
int totalOps = 0;
|
||||
for (long term : terms) {
|
||||
final List<Engine.Operation> ops = generateSingleDocHistory(true,
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), term, 2, 20, "1");
|
||||
primaryTerm.set(Math.max(primaryTerm.get(), term));
|
||||
engine.rollTranslogGeneration();
|
||||
for (Engine.Operation op : ops) {
|
||||
// We need to simulate a rollback here as only ops after local checkpoint get into the engine
|
||||
if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) {
|
||||
engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1));
|
||||
engine.rollTranslogGeneration();
|
||||
}
|
||||
if (op instanceof Engine.Index) {
|
||||
engine.index((Engine.Index) op);
|
||||
} else if (op instanceof Engine.Delete) {
|
||||
engine.delete((Engine.Delete) op);
|
||||
}
|
||||
latestOperations.put(op.seqNo(), op.primaryTerm());
|
||||
if (rarely()) {
|
||||
engine.refresh("test");
|
||||
}
|
||||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
totalOps++;
|
||||
}
|
||||
}
|
||||
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
|
||||
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, 0, maxSeqNo, false)) {
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo())));
|
||||
}
|
||||
assertThat(snapshot.overriddenOperations(), equalTo(totalOps - latestOperations.size()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdateAndReadChangesConcurrently() throws Exception {
|
||||
Follower[] followers = new Follower[between(1, 3)];
|
||||
CountDownLatch readyLatch = new CountDownLatch(followers.length + 1);
|
||||
AtomicBoolean isDone = new AtomicBoolean();
|
||||
for (int i = 0; i < followers.length; i++) {
|
||||
followers[i] = new Follower(engine, isDone, readyLatch);
|
||||
followers[i].start();
|
||||
}
|
||||
boolean onPrimary = randomBoolean();
|
||||
List<Engine.Operation> operations = new ArrayList<>();
|
||||
int numOps = scaledRandomIntBetween(1, 1000);
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
String id = Integer.toString(randomIntBetween(1, 10));
|
||||
ParsedDocument doc = createParsedDoc(id, randomAlphaOfLengthBetween(1, 5));
|
||||
final Engine.Operation op;
|
||||
if (onPrimary) {
|
||||
if (randomBoolean()) {
|
||||
op = new Engine.Index(newUid(doc), primaryTerm.get(), doc);
|
||||
} else {
|
||||
op = new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get());
|
||||
}
|
||||
} else {
|
||||
if (randomBoolean()) {
|
||||
op = replicaIndexForDoc(doc, randomNonNegativeLong(), i, randomBoolean());
|
||||
} else {
|
||||
op = replicaDeleteForDoc(doc.id(), randomNonNegativeLong(), i, randomNonNegativeLong());
|
||||
}
|
||||
}
|
||||
operations.add(op);
|
||||
}
|
||||
readyLatch.countDown();
|
||||
concurrentlyApplyOps(operations, engine);
|
||||
assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(operations.size() - 1L));
|
||||
isDone.set(true);
|
||||
for (Follower follower : followers) {
|
||||
follower.join();
|
||||
}
|
||||
}
|
||||
|
||||
class Follower extends Thread {
|
||||
private final Engine leader;
|
||||
private final TranslogHandler translogHandler;
|
||||
private final AtomicBoolean isDone;
|
||||
private final CountDownLatch readLatch;
|
||||
|
||||
Follower(Engine leader, AtomicBoolean isDone, CountDownLatch readLatch) {
|
||||
this.leader = leader;
|
||||
this.isDone = isDone;
|
||||
this.readLatch = readLatch;
|
||||
this.translogHandler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(),
|
||||
engine.engineConfig.getIndexSettings().getSettings()));
|
||||
}
|
||||
|
||||
void pullOperations(Engine follower) throws IOException {
|
||||
long leaderCheckpoint = leader.getLocalCheckpointTracker().getCheckpoint();
|
||||
long followerCheckpoint = follower.getLocalCheckpointTracker().getCheckpoint();
|
||||
if (followerCheckpoint < leaderCheckpoint) {
|
||||
long fromSeqNo = followerCheckpoint + 1;
|
||||
long batchSize = randomLongBetween(0, 100);
|
||||
long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint);
|
||||
try (Translog.Snapshot snapshot = leader.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
|
||||
translogHandler.run(follower, snapshot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (Store store = createStore();
|
||||
InternalEngine follower = createEngine(store, createTempDir())) {
|
||||
readLatch.countDown();
|
||||
readLatch.await();
|
||||
while (isDone.get() == false ||
|
||||
follower.getLocalCheckpointTracker().getCheckpoint() < leader.getLocalCheckpointTracker().getCheckpoint()) {
|
||||
pullOperations(follower);
|
||||
}
|
||||
assertConsistentHistoryBetweenTranslogAndLuceneIndex(follower, mapperService);
|
||||
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
|
||||
} catch (Exception ex) {
|
||||
throw new AssertionError(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Translog.Operation> drainAll(Translog.Snapshot snapshot) throws IOException {
|
||||
List<Translog.Operation> operations = new ArrayList<>();
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
final Translog.Operation newOp = op;
|
||||
logger.error("Reading [{}]", op);
|
||||
assert operations.stream().allMatch(o -> o.seqNo() < newOp.seqNo()) : "Operations [" + operations + "], op [" + op + "]";
|
||||
operations.add(newOp);
|
||||
}
|
||||
return operations;
|
||||
}
|
||||
}
|
|
@ -22,29 +22,26 @@ package org.elasticsearch.index.engine;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.StoredField;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.LiveIndexWriterConfig;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.apache.lucene.index.ReaderUtil;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.ReferenceManager;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.apache.lucene.search.SortedNumericSortField;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TotalHitCountCollector;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
@ -67,7 +64,6 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.MapperTestUtils;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
|
@ -101,8 +97,12 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
@ -116,7 +116,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
|||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public abstract class EngineTestCase extends ESTestCase {
|
||||
|
@ -568,7 +567,7 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) {
|
||||
return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL,
|
||||
return new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm.get(), version, VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.REPLICA, startTime);
|
||||
}
|
||||
protected static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException {
|
||||
|
@ -705,82 +704,87 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
|
||||
Thread[] thread = new Thread[randomIntBetween(3, 5)];
|
||||
CountDownLatch startGun = new CountDownLatch(thread.length);
|
||||
AtomicInteger offset = new AtomicInteger(-1);
|
||||
for (int i = 0; i < thread.length; i++) {
|
||||
thread[i] = new Thread(() -> {
|
||||
startGun.countDown();
|
||||
try {
|
||||
startGun.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
int docOffset;
|
||||
while ((docOffset = offset.incrementAndGet()) < ops.size()) {
|
||||
try {
|
||||
final Engine.Operation op = ops.get(docOffset);
|
||||
if (op instanceof Engine.Index) {
|
||||
engine.index((Engine.Index) op);
|
||||
} else if (op instanceof Engine.Delete){
|
||||
engine.delete((Engine.Delete) op);
|
||||
} else {
|
||||
engine.noOp((Engine.NoOp) op);
|
||||
}
|
||||
if ((docOffset + 1) % 4 == 0) {
|
||||
engine.refresh("test");
|
||||
}
|
||||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
thread[i].start();
|
||||
}
|
||||
for (int i = 0; i < thread.length; i++) {
|
||||
thread[i].join();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all docId from the given engine.
|
||||
*/
|
||||
public static Set<String> getDocIds(Engine engine, boolean refresh) throws IOException {
|
||||
if (refresh) {
|
||||
engine.refresh("test_get_doc_ids");
|
||||
}
|
||||
try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) {
|
||||
Set<String> ids = new HashSet<>();
|
||||
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
|
||||
LeafReader reader = leafContext.reader();
|
||||
Bits liveDocs = reader.getLiveDocs();
|
||||
for (int i = 0; i < reader.maxDoc(); i++) {
|
||||
if (liveDocs == null || liveDocs.get(i)) {
|
||||
Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME));
|
||||
BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME);
|
||||
ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads all engine operations that have been processed by the engine from Lucene index.
|
||||
* The returned operations are sorted and de-duplicated, thus each sequence number will be have at most one operation.
|
||||
*/
|
||||
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException {
|
||||
engine.refresh("test");
|
||||
final List<Translog.Operation> operations = new ArrayList<>();
|
||||
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
|
||||
final IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
|
||||
final Sort sortedBySeqNoThenByTerm = new Sort(
|
||||
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
|
||||
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
|
||||
);
|
||||
final TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE, sortedBySeqNoThenByTerm);
|
||||
long lastSeenSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
|
||||
for (ScoreDoc scoreDoc : allDocs.scoreDocs) {
|
||||
final Translog.Operation op = readOperationInLucene(indexSearcher, mapper, scoreDoc.doc);
|
||||
if (op.seqNo() != lastSeenSeqNo) {
|
||||
long maxSeqNo = Math.max(0, engine.getLocalCheckpointTracker().getMaxSeqNo());
|
||||
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapper, 0, maxSeqNo, false)) {
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null){
|
||||
operations.add(op);
|
||||
lastSeenSeqNo = op.seqNo();
|
||||
}
|
||||
}
|
||||
}
|
||||
return operations;
|
||||
}
|
||||
|
||||
private static Translog.Operation readOperationInLucene(IndexSearcher searcher, MapperService mapper, int docID) throws IOException {
|
||||
final List<LeafReaderContext> leaves = searcher.getIndexReader().leaves();
|
||||
final int leafIndex = ReaderUtil.subIndex(docID, leaves);
|
||||
final int segmentDocID = docID - leaves.get(leafIndex).docBase;
|
||||
final long seqNo = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.NAME, segmentDocID);
|
||||
final long primaryTerm = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.PRIMARY_TERM_NAME, segmentDocID);
|
||||
final long version = readNumericDV(leaves.get(leafIndex), VersionFieldMapper.NAME, segmentDocID);
|
||||
final FieldsVisitor fields = new FieldsVisitor(true);
|
||||
searcher.doc(docID, fields);
|
||||
fields.postProcess(mapper);
|
||||
final Translog.Operation op;
|
||||
final boolean isTombstone = isTombstoneOperation(leaves.get(leafIndex), segmentDocID);
|
||||
if (isTombstone && fields.uid() == null) {
|
||||
op = new Translog.NoOp(seqNo, primaryTerm, "");
|
||||
assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1
|
||||
: "Noop operation but soft_deletes field is not set";
|
||||
assert version == 1 : "Noop tombstone should have version 1L; actual version [" + version + "]";
|
||||
} else {
|
||||
final String id = fields.uid().id();
|
||||
final String type = fields.uid().type();
|
||||
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
|
||||
if (isTombstone) {
|
||||
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL);
|
||||
assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1
|
||||
: "Delete operation but soft_deletes field is not set";
|
||||
} else {
|
||||
final BytesReference source = fields.source();
|
||||
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL, source.toBytesRef().bytes,
|
||||
fields.routing(), -1);
|
||||
}
|
||||
}
|
||||
return op;
|
||||
}
|
||||
|
||||
private static boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException {
|
||||
final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
|
||||
if (tombstoneDV != null && tombstoneDV.advanceExact(segmentDocID)) {
|
||||
return tombstoneDV.longValue() == 1;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException {
|
||||
final NumericDocValues dv = leaf.reader().getNumericDocValues(field);
|
||||
if (dv == null || dv.advanceExact(segmentDocID) == false) {
|
||||
throw new IllegalStateException("DocValues for field [" + field + "] is not found");
|
||||
}
|
||||
return dv.longValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the provided engine has a consistent document history between translog and Lucene index.
|
||||
*/
|
||||
|
@ -811,7 +815,7 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
assertThat(luceneOp, notNullValue());
|
||||
assertThat(luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
|
||||
assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
|
||||
assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
|
||||
if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
|
||||
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
|
||||
|
|
|
@ -518,22 +518,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
protected Set<String> getShardDocUIDs(final IndexShard shard) throws IOException {
|
||||
shard.refresh("get_uids");
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
Set<String> ids = new HashSet<>();
|
||||
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
|
||||
LeafReader reader = leafContext.reader();
|
||||
Bits liveDocs = reader.getLiveDocs();
|
||||
for (int i = 0; i < reader.maxDoc(); i++) {
|
||||
if (liveDocs == null || liveDocs.get(i)) {
|
||||
Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME));
|
||||
BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME);
|
||||
ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
return EngineTestCase.getDocIds(shard.getEngine(), true);
|
||||
}
|
||||
|
||||
protected void assertDocCount(IndexShard shard, int docDount) throws IOException {
|
||||
|
|
|
@ -44,6 +44,10 @@ public class FollowIndexIT extends ESRestTestCase {
|
|||
final String leaderIndexName = "test_index1";
|
||||
if (runningAgainstLeaderCluster) {
|
||||
logger.info("Running against leader cluster");
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.build();
|
||||
createIndex(leaderIndexName, indexSettings);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
logger.info("Indexing doc [{}]", i);
|
||||
index(client(), leaderIndexName, Integer.toString(i), "field", i);
|
||||
|
|
|
@ -34,11 +34,8 @@ import org.elasticsearch.transport.TransportService;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
|
@ -227,6 +224,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
|||
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
|
||||
IndexShard indexShard = indexService.getShard(request.getShard().id());
|
||||
|
||||
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint());
|
||||
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
|
||||
}
|
||||
|
||||
|
@ -256,42 +254,18 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
|||
if (indexShard.state() != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
|
||||
}
|
||||
|
||||
long seenBytes = 0;
|
||||
long nextExpectedSeqNo = minSeqNo;
|
||||
final Queue<Translog.Operation> orderedOps = new PriorityQueue<>(Comparator.comparingLong(Translog.Operation::seqNo));
|
||||
|
||||
int seenBytes = 0;
|
||||
final List<Translog.Operation> operations = new ArrayList<>();
|
||||
try (Translog.Snapshot snapshot = indexShard.newTranslogSnapshotBetween(minSeqNo, maxSeqNo)) {
|
||||
for (Translog.Operation unorderedOp = snapshot.next(); unorderedOp != null; unorderedOp = snapshot.next()) {
|
||||
if (unorderedOp.seqNo() < minSeqNo || unorderedOp.seqNo() > maxSeqNo) {
|
||||
continue;
|
||||
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) {
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
operations.add(op);
|
||||
seenBytes += op.estimateSize();
|
||||
if (seenBytes > byteLimit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
orderedOps.add(unorderedOp);
|
||||
while (orderedOps.peek() != null && orderedOps.peek().seqNo() == nextExpectedSeqNo) {
|
||||
Translog.Operation orderedOp = orderedOps.poll();
|
||||
if (seenBytes < byteLimit) {
|
||||
nextExpectedSeqNo++;
|
||||
seenBytes += orderedOp.estimateSize();
|
||||
operations.add(orderedOp);
|
||||
if (nextExpectedSeqNo > maxSeqNo) {
|
||||
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
|
||||
}
|
||||
} else {
|
||||
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nextExpectedSeqNo >= maxSeqNo) {
|
||||
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
|
||||
} else {
|
||||
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
|
||||
"] found, tracker checkpoint [" + nextExpectedSeqNo + "]";
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -219,6 +221,61 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testFollowIndexWithNestedField() throws Exception {
|
||||
final String leaderIndexSettings =
|
||||
getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
|
||||
final String followerIndexSettings =
|
||||
getIndexSettingsWithNestedMapping(1, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
|
||||
|
||||
ensureGreen("index1", "index2");
|
||||
|
||||
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
|
||||
followRequest.setLeaderIndex("index1");
|
||||
followRequest.setFollowIndex("index2");
|
||||
client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get();
|
||||
|
||||
final int numDocs = randomIntBetween(2, 64);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
builder.field("field", "value");
|
||||
builder.startArray("objects");
|
||||
{
|
||||
builder.startObject();
|
||||
builder.field("field", i);
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(builder).get();
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
int value = i;
|
||||
assertBusy(() -> {
|
||||
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
|
||||
assertTrue(getResponse.isExists());
|
||||
assertTrue((getResponse.getSource().containsKey("field")));
|
||||
assertThat(XContentMapValues.extractValue("objects.field", getResponse.getSource()),
|
||||
equalTo(Collections.singletonList(value)));
|
||||
});
|
||||
}
|
||||
|
||||
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
|
||||
unfollowRequest.setFollowIndex("index2");
|
||||
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
|
||||
|
||||
assertBusy(() -> {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(0));
|
||||
});
|
||||
}
|
||||
|
||||
public void testFollowNonExistentIndex() throws Exception {
|
||||
assertAcked(client().admin().indices().prepareCreate("test-leader").get());
|
||||
assertAcked(client().admin().indices().prepareCreate("test-follower").get());
|
||||
|
@ -319,4 +376,55 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
return settings;
|
||||
}
|
||||
|
||||
private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards,
|
||||
final Map<String, String> additionalIndexSettings) throws IOException {
|
||||
final String settings;
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("settings");
|
||||
{
|
||||
builder.field("index.number_of_shards", numberOfPrimaryShards);
|
||||
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
|
||||
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
builder.startObject("mappings");
|
||||
{
|
||||
builder.startObject("doc");
|
||||
{
|
||||
builder.startObject("properties");
|
||||
{
|
||||
builder.startObject("objects");
|
||||
{
|
||||
builder.field("type", "nested");
|
||||
builder.startObject("properties");
|
||||
{
|
||||
builder.startObject("field");
|
||||
{
|
||||
builder.field("type", "long");
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
builder.startObject("field");
|
||||
{
|
||||
builder.field("type", "keyword");
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
settings = BytesReference.bytes(builder).utf8ToString();
|
||||
}
|
||||
return settings;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -20,7 +18,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
|
@ -33,7 +31,6 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
|
|||
final Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.translog.generation_threshold_size", new ByteSizeValue(randomIntBetween(8, 64), ByteSizeUnit.KB))
|
||||
.build();
|
||||
final IndexService indexService = createIndex("index", settings);
|
||||
|
||||
|
@ -48,28 +45,23 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
|
|||
for (int iter = 0; iter < iters; iter++) {
|
||||
int min = randomIntBetween(0, numWrites - 1);
|
||||
int max = randomIntBetween(min, numWrites - 1);
|
||||
|
||||
final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
|
||||
/*
|
||||
* We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple
|
||||
* generations) so the best we can assert is that we see the expected operations.
|
||||
*/
|
||||
final Set<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet());
|
||||
final Set<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet());
|
||||
final List<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toList());
|
||||
final List<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList());
|
||||
assertThat(seenSeqNos, equalTo(expectedSeqNos));
|
||||
}
|
||||
|
||||
// get operations for a range no operations exists:
|
||||
Exception e = expectThrows(IllegalStateException.class,
|
||||
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE));
|
||||
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" +
|
||||
(numWrites + 1) +"] found, tracker checkpoint ["));
|
||||
assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + numWrites + "] and max_seqno [" +
|
||||
(numWrites + 1) +"] found"));
|
||||
|
||||
// get operations for a range some operations do not exist:
|
||||
e = expectThrows(IllegalStateException.class,
|
||||
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE));
|
||||
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" +
|
||||
(numWrites + 10) +"] found, tracker checkpoint ["));
|
||||
assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + (numWrites - 10) + "] and max_seqno [" +
|
||||
(numWrites + 10) +"] found"));
|
||||
}
|
||||
|
||||
public void testGetOperationsBetweenWhenShardNotStarted() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue