Fill LocalCheckpointTracker with Lucene commit (#34474)
Today we rely on the LocalCheckpointTracker to ensure no duplicate when enabling optimization using max_seq_no_of_updates. The problem is that the LocalCheckpointTracker is not fully reloaded when opening an engine with an out-of-order index commit. Suppose the starting commit has seq#0 and seq#2, then the current LocalCheckpointTracker would return "false" when asking if seq#2 was processed before although seq#2 in the commit. This change scans the existing sequence numbers in the starting commit, then marks these as completed in the LocalCheckpointTracker to ensure the consistent state between LocalCheckpointTracker and Lucene commit.
This commit is contained in:
parent
c476f91811
commit
90ca5b1fde
|
@ -27,6 +27,7 @@ import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.codecs.DocValuesFormat;
|
import org.apache.lucene.codecs.DocValuesFormat;
|
||||||
import org.apache.lucene.codecs.PostingsFormat;
|
import org.apache.lucene.codecs.PostingsFormat;
|
||||||
import org.apache.lucene.document.LatLonDocValuesField;
|
import org.apache.lucene.document.LatLonDocValuesField;
|
||||||
|
import org.apache.lucene.document.LongPoint;
|
||||||
import org.apache.lucene.document.NumericDocValuesField;
|
import org.apache.lucene.document.NumericDocValuesField;
|
||||||
import org.apache.lucene.index.CorruptIndexException;
|
import org.apache.lucene.index.CorruptIndexException;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.lucene.index.IndexWriterConfig;
|
||||||
import org.apache.lucene.index.LeafReader;
|
import org.apache.lucene.index.LeafReader;
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
import org.apache.lucene.index.NoMergePolicy;
|
import org.apache.lucene.index.NoMergePolicy;
|
||||||
|
import org.apache.lucene.index.NumericDocValues;
|
||||||
import org.apache.lucene.index.SegmentCommitInfo;
|
import org.apache.lucene.index.SegmentCommitInfo;
|
||||||
import org.apache.lucene.index.SegmentInfos;
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
import org.apache.lucene.index.SegmentReader;
|
import org.apache.lucene.index.SegmentReader;
|
||||||
|
@ -82,6 +84,7 @@ import org.elasticsearch.common.util.iterable.Iterables;
|
||||||
import org.elasticsearch.index.analysis.AnalyzerScope;
|
import org.elasticsearch.index.analysis.AnalyzerScope;
|
||||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||||
|
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
|
@ -91,6 +94,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.LongConsumer;
|
||||||
|
|
||||||
public class Lucene {
|
public class Lucene {
|
||||||
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
|
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
|
||||||
|
@ -968,4 +972,39 @@ public class Lucene {
|
||||||
public static NumericDocValuesField newSoftDeletesField() {
|
public static NumericDocValuesField newSoftDeletesField() {
|
||||||
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
|
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
|
||||||
|
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
|
||||||
|
*
|
||||||
|
* @param directoryReader the directory reader to scan
|
||||||
|
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
|
||||||
|
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
|
||||||
|
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
|
||||||
|
*/
|
||||||
|
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
|
||||||
|
LongConsumer onNewSeqNo) throws IOException {
|
||||||
|
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
|
||||||
|
final IndexSearcher searcher = new IndexSearcher(reader);
|
||||||
|
searcher.setQueryCache(null);
|
||||||
|
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
|
||||||
|
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
|
||||||
|
for (LeafReaderContext leaf : reader.leaves()) {
|
||||||
|
final Scorer scorer = weight.scorer(leaf);
|
||||||
|
if (scorer == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final DocIdSetIterator docIdSetIterator = scorer.iterator();
|
||||||
|
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
|
||||||
|
int docId;
|
||||||
|
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||||
|
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
|
||||||
|
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
|
||||||
|
}
|
||||||
|
final long seqNo = seqNoDocValues.longValue();
|
||||||
|
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
|
||||||
|
onNewSeqNo.accept(seqNo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,6 +102,7 @@ import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class InternalEngine extends Engine {
|
public class InternalEngine extends Engine {
|
||||||
|
@ -189,7 +190,6 @@ public class InternalEngine extends Engine {
|
||||||
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
|
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
|
||||||
assert translog.getGeneration() != null;
|
assert translog.getGeneration() != null;
|
||||||
this.translog = translog;
|
this.translog = translog;
|
||||||
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
|
|
||||||
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
|
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
|
||||||
this.softDeletesPolicy = newSoftDeletesPolicy();
|
this.softDeletesPolicy = newSoftDeletesPolicy();
|
||||||
this.combinedDeletionPolicy =
|
this.combinedDeletionPolicy =
|
||||||
|
@ -223,6 +223,8 @@ public class InternalEngine extends Engine {
|
||||||
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
|
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
|
||||||
this.internalSearcherManager.addListener(listener);
|
this.internalSearcherManager.addListener(listener);
|
||||||
}
|
}
|
||||||
|
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
|
||||||
|
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
|
||||||
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
|
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
|
||||||
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
|
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
|
||||||
success = true;
|
success = true;
|
||||||
|
@ -238,16 +240,29 @@ public class InternalEngine extends Engine {
|
||||||
logger.trace("created new InternalEngine");
|
logger.trace("created new InternalEngine");
|
||||||
}
|
}
|
||||||
|
|
||||||
private LocalCheckpointTracker createLocalCheckpointTracker(
|
private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
|
||||||
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
|
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
|
||||||
final long maxSeqNo;
|
try {
|
||||||
final long localCheckpoint;
|
|
||||||
final SequenceNumbers.CommitInfo seqNoStats =
|
final SequenceNumbers.CommitInfo seqNoStats =
|
||||||
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
|
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
|
||||||
maxSeqNo = seqNoStats.maxSeqNo;
|
final long maxSeqNo = seqNoStats.maxSeqNo;
|
||||||
localCheckpoint = seqNoStats.localCheckpoint;
|
final long localCheckpoint = seqNoStats.localCheckpoint;
|
||||||
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
|
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
|
||||||
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
|
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
|
||||||
|
// Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
|
||||||
|
// create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
|
||||||
|
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
|
||||||
|
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
|
||||||
|
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
|
||||||
|
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
|
||||||
|
try (Searcher searcher = searcherSupplier.get()) {
|
||||||
|
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, tracker::markSeqNoAsCompleted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tracker;
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
|
private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
|
||||||
|
@ -665,6 +680,8 @@ public class InternalEngine extends Engine {
|
||||||
} else if (op.seqNo() > docAndSeqNo.seqNo) {
|
} else if (op.seqNo() > docAndSeqNo.seqNo) {
|
||||||
status = OpVsLuceneDocStatus.OP_NEWER;
|
status = OpVsLuceneDocStatus.OP_NEWER;
|
||||||
} else if (op.seqNo() == docAndSeqNo.seqNo) {
|
} else if (op.seqNo() == docAndSeqNo.seqNo) {
|
||||||
|
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
|
||||||
|
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
|
||||||
// load term to tie break
|
// load term to tie break
|
||||||
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
|
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
|
||||||
if (op.primaryTerm() > existingTerm) {
|
if (op.primaryTerm() > existingTerm) {
|
||||||
|
|
|
@ -5081,6 +5081,77 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRebuildLocalCheckpointTracker() throws Exception {
|
||||||
|
Settings.Builder settings = Settings.builder()
|
||||||
|
.put(defaultSettings.getSettings())
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
|
||||||
|
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
|
||||||
|
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
|
||||||
|
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||||
|
Path translogPath = createTempDir();
|
||||||
|
int numOps = scaledRandomIntBetween(1, 500);
|
||||||
|
List<Engine.Operation> operations = new ArrayList<>();
|
||||||
|
for (int i = 0; i < numOps; i++) {
|
||||||
|
long seqNo = i;
|
||||||
|
final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
|
||||||
|
if (randomBoolean()) {
|
||||||
|
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
|
||||||
|
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
|
||||||
|
} else if (randomBoolean()) {
|
||||||
|
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
|
||||||
|
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
|
||||||
|
} else {
|
||||||
|
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
|
||||||
|
threadPool.relativeTimeInMillis(), "test-" + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Randomness.shuffle(operations);
|
||||||
|
List<List<Engine.Operation>> commits = new ArrayList<>();
|
||||||
|
commits.add(new ArrayList<>());
|
||||||
|
try (Store store = createStore()) {
|
||||||
|
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
|
||||||
|
try (InternalEngine engine = createEngine(config)) {
|
||||||
|
List<Engine.Operation> flushedOperations = new ArrayList<>();
|
||||||
|
for (Engine.Operation op : operations) {
|
||||||
|
flushedOperations.add(op);
|
||||||
|
if (op instanceof Engine.Index) {
|
||||||
|
engine.index((Engine.Index) op);
|
||||||
|
} else if (op instanceof Engine.Delete) {
|
||||||
|
engine.delete((Engine.Delete) op);
|
||||||
|
} else {
|
||||||
|
engine.noOp((Engine.NoOp) op);
|
||||||
|
}
|
||||||
|
if (randomInt(100) < 10) {
|
||||||
|
engine.refresh("test");
|
||||||
|
}
|
||||||
|
if (randomInt(100) < 5) {
|
||||||
|
engine.flush();
|
||||||
|
commits.add(new ArrayList<>(flushedOperations));
|
||||||
|
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
|
||||||
|
engine.syncTranslog();
|
||||||
|
}
|
||||||
|
trimUnsafeCommits(config);
|
||||||
|
List<Engine.Operation> safeCommit = null;
|
||||||
|
for (int i = commits.size() - 1; i >= 0; i--) {
|
||||||
|
if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
|
||||||
|
safeCommit = commits.get(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertThat(safeCommit, notNullValue());
|
||||||
|
try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog
|
||||||
|
final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker();
|
||||||
|
for (Engine.Operation op : operations) {
|
||||||
|
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
|
||||||
|
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void trimUnsafeCommits(EngineConfig config) throws IOException {
|
static void trimUnsafeCommits(EngineConfig config) throws IOException {
|
||||||
final Store store = config.getStore();
|
final Store store = config.getStore();
|
||||||
final TranslogConfig translogConfig = config.getTranslogConfig();
|
final TranslogConfig translogConfig = config.getTranslogConfig();
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexableField;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -61,6 +62,7 @@ import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.anyOf;
|
import static org.hamcrest.Matchers.anyOf;
|
||||||
|
@ -681,6 +683,50 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAddNewReplicas() throws Exception {
|
||||||
|
try (ReplicationGroup shards = createGroup(between(0, 1))) {
|
||||||
|
shards.startAll();
|
||||||
|
Thread[] threads = new Thread[between(1, 3)];
|
||||||
|
AtomicBoolean isStopped = new AtomicBoolean();
|
||||||
|
boolean appendOnly = randomBoolean();
|
||||||
|
AtomicInteger docId = new AtomicInteger();
|
||||||
|
for (int i = 0; i < threads.length; i++) {
|
||||||
|
threads[i] = new Thread(() -> {
|
||||||
|
while (isStopped.get() == false) {
|
||||||
|
try {
|
||||||
|
if (appendOnly) {
|
||||||
|
String id = randomBoolean() ? Integer.toString(docId.incrementAndGet()) : null;
|
||||||
|
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
|
||||||
|
} else if (frequently()) {
|
||||||
|
String id = Integer.toString(frequently() ? docId.incrementAndGet() : between(0, 10));
|
||||||
|
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
|
||||||
|
} else {
|
||||||
|
String id = Integer.toString(between(0, docId.get()));
|
||||||
|
shards.delete(new DeleteRequest(index.getName(), "type", id));
|
||||||
|
}
|
||||||
|
if (randomInt(100) < 10) {
|
||||||
|
shards.getPrimary().flush(new FlushRequest());
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new AssertionError(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50)));
|
||||||
|
shards.getPrimary().sync();
|
||||||
|
IndexShard newReplica = shards.addReplica();
|
||||||
|
shards.recoverReplica(newReplica);
|
||||||
|
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100)));
|
||||||
|
isStopped.set(true);
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary()))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class BlockingTarget extends RecoveryTarget {
|
public static class BlockingTarget extends RecoveryTarget {
|
||||||
|
|
||||||
private final CountDownLatch recoveryBlocked;
|
private final CountDownLatch recoveryBlocked;
|
||||||
|
|
|
@ -691,6 +691,64 @@ public class IndexFollowingIT extends CCRIntegTestCase {
|
||||||
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
|
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAddNewReplicasOnFollower() throws Exception {
|
||||||
|
int numberOfReplicas = between(0, 1);
|
||||||
|
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
|
||||||
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
|
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
|
PutFollowAction.Request follow = follow("leader-index", "follower-index");
|
||||||
|
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
|
||||||
|
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3));
|
||||||
|
ensureFollowerGreen("follower-index");
|
||||||
|
AtomicBoolean stopped = new AtomicBoolean();
|
||||||
|
AtomicInteger docID = new AtomicInteger();
|
||||||
|
boolean appendOnly = randomBoolean();
|
||||||
|
Thread indexingOnLeader = new Thread(() -> {
|
||||||
|
while (stopped.get() == false) {
|
||||||
|
try {
|
||||||
|
if (appendOnly) {
|
||||||
|
String id = Integer.toString(docID.incrementAndGet());
|
||||||
|
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
|
||||||
|
} else if (frequently()) {
|
||||||
|
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 100));
|
||||||
|
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
|
||||||
|
} else {
|
||||||
|
String id = Integer.toString(between(0, docID.get()));
|
||||||
|
leaderClient().prepareDelete("leader-index", "doc", id).get();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new AssertionError(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
indexingOnLeader.start();
|
||||||
|
Thread flushingOnFollower = new Thread(() -> {
|
||||||
|
while (stopped.get() == false) {
|
||||||
|
try {
|
||||||
|
if (rarely()) {
|
||||||
|
followerClient().admin().indices().prepareFlush("follower-index").get();
|
||||||
|
}
|
||||||
|
if (rarely()) {
|
||||||
|
followerClient().admin().indices().prepareRefresh("follower-index").get();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new AssertionError(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
flushingOnFollower.start();
|
||||||
|
atLeastDocsIndexed(followerClient(), "follower-index", 50);
|
||||||
|
followerClient().admin().indices().prepareUpdateSettings("follower-index")
|
||||||
|
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get();
|
||||||
|
ensureFollowerGreen("follower-index");
|
||||||
|
atLeastDocsIndexed(followerClient(), "follower-index", 100);
|
||||||
|
stopped.set(true);
|
||||||
|
flushingOnFollower.join();
|
||||||
|
indexingOnLeader.join();
|
||||||
|
assertSameDocCount("leader-index", "follower-index");
|
||||||
|
unfollowIndex("follower-index");
|
||||||
|
}
|
||||||
|
|
||||||
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
|
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
|
||||||
return () -> {
|
return () -> {
|
||||||
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
|
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
@ -31,6 +32,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
|
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
|
||||||
|
@ -40,12 +42,14 @@ import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
|
||||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
|
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -281,6 +285,41 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAddNewFollowingReplica() throws Exception {
|
||||||
|
final byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
|
||||||
|
final int numDocs = between(1, 100);
|
||||||
|
final List<Translog.Operation> operations = new ArrayList<>(numDocs);
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1));
|
||||||
|
}
|
||||||
|
Future<Void> recoveryFuture = null;
|
||||||
|
try (ReplicationGroup group = createFollowGroup(between(0, 1))) {
|
||||||
|
group.startAll();
|
||||||
|
while (operations.isEmpty() == false) {
|
||||||
|
List<Translog.Operation> bulkOps = randomSubsetOf(between(1, operations.size()), operations);
|
||||||
|
operations.removeAll(bulkOps);
|
||||||
|
BulkShardOperationsRequest bulkRequest = new BulkShardOperationsRequest(group.getPrimary().shardId(),
|
||||||
|
group.getPrimary().getHistoryUUID(), bulkOps, -1);
|
||||||
|
new CCRAction(bulkRequest, new PlainActionFuture<>(), group).execute();
|
||||||
|
if (randomInt(100) < 10) {
|
||||||
|
group.getPrimary().flush(new FlushRequest());
|
||||||
|
}
|
||||||
|
if (recoveryFuture == null && (randomInt(100) < 10 || operations.isEmpty())) {
|
||||||
|
group.getPrimary().sync();
|
||||||
|
IndexShard newReplica = group.addReplica();
|
||||||
|
// We need to recover the replica async to release the main thread for the following task to fill missing
|
||||||
|
// operations between the local checkpoint and max_seq_no which the recovering replica is waiting for.
|
||||||
|
recoveryFuture = group.asyncRecoverReplica(newReplica,
|
||||||
|
(shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (recoveryFuture != null) {
|
||||||
|
recoveryFuture.get();
|
||||||
|
}
|
||||||
|
group.assertAllEqual(numDocs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
|
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
|
||||||
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
|
Loading…
Reference in New Issue