Introduce sequence-number-based recovery

This commit introduces sequence-number-based recovery. When a replica
has fallen out of sync, rather than performing a file-based recovery we
first attempt to replay operations since the last local checkpoint on
the replica. To do this, at the start of recovery the replica tells the
primary what its local checkpoint is. The primary will then wait for all
operations between that local checkpoint and the current maximum
sequence number to complete; this is to ensure that there are no gaps in
the operations that will be replayed from the primary to the
replica. This is a best-effort attempt as we currently have no
guarantees on the primary that these operations will be available; if we
are not able to replay all operations in the desired range, we just
fallback to file-based recovery. Later work will strengthen the
guarantees.

Relates #22484
This commit is contained in:
Jason Tedor 2017-01-27 08:16:38 -08:00 committed by GitHub
parent 417c93c570
commit 930282e161
33 changed files with 1271 additions and 557 deletions

View File

@ -408,7 +408,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]PeerRecoverySourceService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]StartRecoveryRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]ttl[/\\]IndicesTTLService.java" checks="LineLength" />

View File

@ -36,7 +36,7 @@ import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
@ -115,7 +115,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.ALLOW_UNMAPPED,
IndexSettings.INDEX_CHECK_ON_STARTUP,
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE,
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
IndexSettings.MAX_SLICES_PER_SCROLL,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.lucene.store.AlreadyClosedException;

View File

@ -379,6 +379,7 @@ public abstract class Engine implements Closeable {
void freeze() {
freeze.set(true);
}
}
public static class IndexResult extends Result {

View File

@ -63,6 +63,7 @@ import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
@ -119,8 +120,6 @@ public class InternalEngine extends Engine {
private final IndexThrottle throttle;
private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@ -159,11 +158,12 @@ public class InternalEngine extends Engine {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = loadSeqNoStatsFromLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbersService.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
@ -353,47 +353,6 @@ public class InternalEngine extends Engine {
return null;
}
/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog
* checkpoint (global checkpoint).
*
* @param translogConfig the translog config (for the global checkpoint)
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
*/
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
final TranslogConfig translogConfig,
final IndexWriter indexWriter) throws IOException {
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
return loadSeqNoStatsFromLucene(globalCheckpoint, indexWriter);
}
/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the
* specified global checkpoint.
*
* @param globalCheckpoint the global checkpoint to use
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
*/
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
maxSeqNo = Long.parseLong(entry.getValue());
}
}
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}
private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
@ -793,7 +752,6 @@ public class InternalEngine extends Engine {
if (delete.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
@ -1532,11 +1490,11 @@ public class InternalEngine extends Engine {
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}

View File

@ -22,25 +22,27 @@ package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
/**
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
* all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
* master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
* These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
* The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
@ -63,14 +65,14 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
private long globalCheckpoint;
/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
* shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is tracking local checkpoints for
* @param shardId the shard ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
@ -127,8 +129,9 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
+ "] is lower than previous one [" + globalCheckpoint + "]");
final String message =
String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
throw new IllegalStateException(message);
}
if (globalCheckpoint != minCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minCheckpoint);

View File

@ -20,18 +20,17 @@
package org.elasticsearch.index.seqno;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.LinkedList;
/**
* This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which all previous
* sequence numbers have been processed (inclusive).
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
* previous sequence numbers have been processed (inclusive).
*/
public class LocalCheckpointService extends AbstractIndexShardComponent {
public class LocalCheckpointTracker {
/**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
@ -67,17 +66,15 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
private volatile long nextSeqNo;
/**
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint for this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint,
* or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
*
* @param shardId the shard this service is providing tracking local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
*/
LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
super(shardId, indexSettings);
public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] "
@ -107,7 +104,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
*
* @param seqNo the sequence number to mark as completed
*/
synchronized void markSeqNoAsCompleted(final long seqNo) {
public synchronized void markSeqNoAsCompleted(final long seqNo) {
// make sure we track highest seen sequence number
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
@ -142,10 +139,25 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
return nextSeqNo - 1;
}
/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
@SuppressForbidden(reason = "Object#wait")
synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedException {
while (checkpoint < seqNo) {
// notified by updateCheckpoint
this.wait();
}
}
/**
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
* current checkpoint is processed.
*/
@SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 :
@ -154,19 +166,24 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set";
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize;
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
current = processedSeqNo.peekFirst();
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
try {
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize;
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
current = processedSeqNo.peekFirst();
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
} finally {
// notifies waiters in waitForOpsToComplete
this.notifyAll();
}
}
/**

View File

@ -30,7 +30,7 @@ import java.io.IOException;
public class SeqNoStats implements ToXContent, Writeable {
private static final String SEQ_NO = "seq_no";
private static final String MAX_SEQ_NO = "max";
private static final String MAX_SEQ_NO = "max_seq_no";
private static final String LOCAL_CHECKPOINT = "local_checkpoint";
private static final String GLOBAL_CHECKPOINT = "global_checkpoint";

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import java.util.Map;
/**
* A utility class for handling sequence numbers.
*/
public class SequenceNumbers {
public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
public static final String MAX_SEQ_NO = "max_seq_no";
/**
* Reads the sequence number stats from the commit data (maximum sequence number and local checkpoint) and uses the specified global
* checkpoint.
*
* @param globalCheckpoint the global checkpoint to use
* @param commitData the commit data
* @return the sequence number stats
*/
public static SeqNoStats loadSeqNoStatsFromLuceneCommit(
final long globalCheckpoint,
final Iterable<Map.Entry<String, String>> commitData) {
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
for (final Map.Entry<String, String> entry : commitData) {
final String key = entry.getKey();
if (key.equals(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : maxSeqNo;
maxSeqNo = Long.parseLong(entry.getValue());
}
}
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}
}

View File

@ -40,8 +40,8 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public static final long NO_OPS_PERFORMED = -1L;
private final LocalCheckpointService localCheckpointService;
private final GlobalCheckpointService globalCheckpointService;
private final LocalCheckpointTracker localCheckpointTracker;
private final GlobalCheckpointTracker globalCheckpointTracker;
/**
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
@ -62,8 +62,8 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
final long localCheckpoint,
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, indexSettings, globalCheckpoint);
}
/**
@ -73,26 +73,36 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @return the next assigned sequence number
*/
public long generateSeqNo() {
return localCheckpointService.generateSeqNo();
return localCheckpointTracker.generateSeqNo();
}
/**
* The maximum sequence number issued so far. See {@link LocalCheckpointService#getMaxSeqNo()} for additional details.
* The maximum sequence number issued so far. See {@link LocalCheckpointTracker#getMaxSeqNo()} for additional details.
*
* @return the maximum sequence number
*/
public long getMaxSeqNo() {
return localCheckpointService.getMaxSeqNo();
return localCheckpointTracker.getMaxSeqNo();
}
/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}
/**
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
* See {@link LocalCheckpointService#markSeqNoAsCompleted(long)} for additional details.
* See {@link LocalCheckpointTracker#markSeqNoAsCompleted(long)} for additional details.
*
* @param seqNo the sequence number to mark as completed
*/
public void markSeqNoAsCompleted(final long seqNo) {
localCheckpointService.markSeqNoAsCompleted(seqNo);
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
/**
@ -106,23 +116,23 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Notifies the service to update the local checkpoint for the shard with the provided allocation ID. See
* {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details.
* {@link GlobalCheckpointTracker#updateLocalCheckpoint(String, long)} for details.
*
* @param allocationId the allocation ID of the shard to update the local checkpoint for
* @param checkpoint the local checkpoint for the shard
*/
public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) {
globalCheckpointService.updateLocalCheckpoint(allocationId, checkpoint);
globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint);
}
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link GlobalCheckpointService#markAllocationIdAsInSync(String)} for additional details.
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
*/
public void markAllocationIdAsInSync(final String allocationId) {
globalCheckpointService.markAllocationIdAsInSync(allocationId);
globalCheckpointTracker.markAllocationIdAsInSync(allocationId);
}
/**
@ -131,7 +141,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @return the local checkpoint
*/
public long getLocalCheckpoint() {
return localCheckpointService.getCheckpoint();
return localCheckpointTracker.getCheckpoint();
}
/**
@ -140,7 +150,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @return the global checkpoint
*/
public long getGlobalCheckpoint() {
return globalCheckpointService.getCheckpoint();
return globalCheckpointTracker.getCheckpoint();
}
/**
@ -150,7 +160,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
return globalCheckpointTracker.updateCheckpointOnPrimary();
}
/**
@ -159,18 +169,18 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @param checkpoint the global checkpoint
*/
public void updateGlobalCheckpointOnReplica(final long checkpoint) {
globalCheckpointService.updateCheckpointOnReplica(checkpoint);
globalCheckpointTracker.updateCheckpointOnReplica(checkpoint);
}
/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}
}

View File

@ -100,7 +100,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.GlobalCheckpointService;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.similarity.SimilarityService;
@ -1368,7 +1368,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service to update the local checkpoint for the shard with the provided allocation ID. See
* {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details.
* {@link GlobalCheckpointTracker#updateLocalCheckpoint(String, long)} for details.
*
* @param allocationId the allocation ID of the shard to update the local checkpoint for
* @param checkpoint the local checkpoint for the shard
@ -1378,9 +1378,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
getEngine().seqNoService().updateLocalCheckpointForShard(allocationId, checkpoint);
}
/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
getEngine().seqNoService().waitForOpsToComplete(seqNo);
}
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link GlobalCheckpointService#markAllocationIdAsInSync(String)} for additional details.
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
*/
@ -1430,7 +1440,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
@ -75,6 +76,8 @@ import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -207,6 +210,20 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
/**
* Loads the local checkpoint and the maximum sequence number from the latest Lucene commit point and returns the triplet of local and
* global checkpoints, and maximum sequence number as an instance of {@link SeqNoStats}. The global checkpoint must be provided
* externally as it is not stored in the commit point.
*
* @param globalCheckpoint the provided global checkpoint
* @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SeqNoStats loadSeqNoStats(final long globalCheckpoint) throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, userData.entrySet());
}
final void ensureOpen() {
if (this.refCounter.refCount() <= 0) {
throw new AlreadyClosedException("store is already closed");

View File

@ -778,6 +778,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Source getSource();
long seqNo();
/**
* Reads the type and the operation from the given stream. The operatino must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
@ -922,6 +924,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return this.source;
}
@Override
public long seqNo() {
return seqNo;
}
@ -1072,6 +1075,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return this.uid;
}
@Override
public long seqNo() {
return seqNo;
}
@ -1147,6 +1151,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final long primaryTerm;
private final String reason;
@Override
public long seqNo() {
return seqNo;
}

View File

@ -53,7 +53,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.GlobalCheckpointService;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -739,7 +739,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
/**
* Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -48,6 +49,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
@ -59,6 +61,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -124,11 +127,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
/**
* cancel all ongoing recoveries for the given shard, if their status match a predicate
* Cancel all ongoing recoveries for the given shard.
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel recoveries
* @return true if a recovery was cancelled
* @param reason reason for cancellation
* @param shardId shard ID for which to cancel recoveries
* @return {@code true} if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason);
@ -152,7 +155,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
retryRecovery(recoveryId, retryAfter, activityTimeout);
}
private void retryRecovery(final long recoveryId, TimeValue retryAfter, TimeValue activityTimeout) {
private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
if (newTarget != null) {
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId()));
@ -166,50 +169,21 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId);
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
}
RecoveryTarget recoveryTarget = recoveryRef.target();
assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
logger.trace("collecting local files for {}", recoveryTarget.sourceNode());
Store.MetadataSnapshot metadataSnapshot;
try {
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
// we are not going to copy any files, so don't bother listing files, potentially running
// into concurrency issues with the primary changing files underneath us.
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} else {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
}
logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size());
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
logger.trace("{} shard folder empty, recover all files", recoveryTarget);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} catch (Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while listing local files, failing recovery", e);
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
return;
}
final RecoveryTarget recoveryTarget = recoveryRef.target();
cancellableThreads = recoveryTarget.cancellableThreads();
timer = recoveryTarget.state().getTimer();
try {
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
request = getStartRecoveryRequest(recoveryTarget);
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
recoveryTarget.indexShard().prepareForIndexRecovery();
request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(),
clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
cancellableThreads = recoveryTarget.CancellableThreads();
timer = recoveryTarget.state().getTimer();
} catch (Exception e) {
} catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
return;
}
@ -227,7 +201,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
}).txGet()));
final RecoveryResponse recoveryResponse = responseHolder.get();
assert responseHolder != null;
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
@ -286,22 +259,23 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException ||
cause instanceof ShardNotFoundException) {
// if the target is not ready yet, retry
retryRecovery(recoveryId, "remote shard not ready", recoverySettings.retryDelayStateSync(),
retryRecovery(
recoveryId,
"remote shard not ready",
recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout());
return;
}
if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
recoverySettings.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(),
recoverySettings.activityTimeout());
retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout());
return;
}
@ -310,10 +284,96 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
}
}
/**
* Obtains a snapshot of the store metadata for the recovery target.
*
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metdata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
try {
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
// we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the
// primary changing files underneath us
return Store.MetadataSnapshot.EMPTY;
} else {
return recoveryTarget.indexShard().snapshotStoreMetadata();
}
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
logger.trace("{} shard folder empty, recovering all files", recoveryTarget);
return Store.MetadataSnapshot.EMPTY;
} catch (final IOException e) {
logger.warn("error while listing local files, recovering as if there are none", e);
return Store.MetadataSnapshot.EMPTY;
}
}
/**
* Prepare the start recovery request.
*
* @param recoveryTarget the target of the recovery
* @return a start recovery request
*/
private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
final long startingSeqNo;
if (metadataSnapshot.size() > 0) {
startingSeqNo = getStartingSeqNo(recoveryTarget);
} else {
startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
} else {
logger.trace(
"{} preparing for sequence-number-based recovery starting at local checkpoint [{}] from [{}]",
recoveryTarget.shardId(),
startingSeqNo,
recoveryTarget.sourceNode());
}
request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.sourceNode(),
clusterService.localNode(),
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
return request;
}
/**
* Get the starting sequence number for a sequence-number-based request.
*
* @param recoveryTarget the target of the recovery
* @return the starting sequence number or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.indexShard().shardPath().resolveTranslog());
return recoveryTarget.store().loadSeqNoStats(globalCheckpoint).getLocalCheckpoint() + 1;
} catch (final IOException e) {
// this can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
// translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
// proceeds to attempt a sequence-number-based recovery
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);

View File

@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
@ -82,14 +83,13 @@ public class RecoveriesCollection {
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout));
}
/**
* Resets the recovery and performs a recovery restart on the currently recovering index shard
*
* @see IndexShard#performRecoveryRestart()
* @return newly created RecoveryTarget
*/
public RecoveryTarget resetRecovery(final long recoveryId, TimeValue activityTimeout) {
public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activityTimeout) {
RecoveryTarget oldRecoveryTarget = null;
final RecoveryTarget newRecoveryTarget;
@ -107,7 +107,7 @@ public class RecoveriesCollection {
}
// Closes the current recovery target
boolean successfulReset = oldRecoveryTarget.resetRecovery(newRecoveryTarget.CancellableThreads());
boolean successfulReset = oldRecoveryTarget.resetRecovery(newRecoveryTarget.cancellableThreads());
if (successfulReset) {
logger.trace("{} restarted recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(),
newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId());

View File

@ -41,6 +41,8 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
@ -124,47 +126,58 @@ public class RecoverySourceHandler {
* performs the recovery from the local engine to the target
*/
public RecoveryResponse recoverToTarget() throws IOException {
try (Translog.View translogView = shard.acquireTranslogView()) {
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
final IndexCommit phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
}
try (final Translog.View translogView = shard.acquireTranslogView()) {
logger.trace("{} captured translog id [{}] for recovery", shard.shardId(), translogView.minTranslogGeneration());
try {
phase1(phase1Snapshot, translogView);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO &&
isTranslogReadyForSequenceNumberBasedRecovery(translogView);
if (!isSequenceNumberBasedRecoveryPossible) {
final IndexCommit phase1Snapshot;
try {
shard.releaseIndexCommit(phase1Snapshot);
} catch (IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
try {
phase1(phase1Snapshot, translogView);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseIndexCommit(phase1Snapshot);
} catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
}
}
// engine was just started at the end of phase 1
try {
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
// engine was just started at the end of phase1
if (shard.state() == IndexShardState.RELOCATED) {
assert request.isPrimaryRelocation() == false :
"recovery target should not retry primary relocation if previous attempt made it past finalization step";
/**
/*
* The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all
* operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the
* local translog and thus will be resent on phase 2. The reason is that an operation replicated by the target primary is
* local translog and thus will be resent on phase2. The reason is that an operation replicated by the target primary is
* sent to the recovery target and the local shard (old primary) concurrently, meaning it may have arrived at the recovery
* target before we opened the engine and is still in-flight on the local shard.
*
* Checking the relocated status here, after we opened the engine on the target, is safe because primary relocation waits
* for all ongoing operations to complete and be fully replicated. Therefore all future operation by the new primary are
* guaranteed to reach the target shard when it's engine is open.
* guaranteed to reach the target shard when its engine is open.
*/
throw new IndexShardRelocatedException(request.shardId());
}
logger.trace("{} snapshot translog for recovery. current size is [{}]", shard.shardId(), translogView.totalOperations());
logger.trace("{} snapshot translog for recovery; current size is [{}]", shard.shardId(), translogView.totalOperations());
try {
phase2(translogView.snapshot());
} catch (Exception e) {
@ -176,6 +189,49 @@ public class RecoverySourceHandler {
return response;
}
/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
*
* @param translogView a view of the translog on the source
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException {
final long startingSeqNo = request.startingSeqNo();
assert startingSeqNo >= 0;
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace("{} starting: [{}], ending: [{}]", shard.shardId(), startingSeqNo, endingSeqNo);
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= endingSeqNo) {
logger.trace(
"{} waiting for all operations in the range [{}, {}] to complete",
shard.shardId(),
startingSeqNo,
endingSeqNo);
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
final Translog.Snapshot snapshot = translogView.snapshot();
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
return tracker.getCheckpoint() >= endingSeqNo;
} else {
// norelease this can currently happen if a snapshot restore rolls the primary back to a previous commit point; in this
// situation the local checkpoint on the replica can be far in advance of the maximum sequence number on the primary violating
// all assumptions regarding local and global checkpoints
return false;
}
}
/**
* Perform phase1 of the recovery operations. Once this {@link IndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)
@ -237,7 +293,7 @@ public class RecoverySourceHandler {
response.phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}]," +
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exist in local store and has checksum [{}]," +
" size [{}]",
indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length());
}
@ -252,7 +308,7 @@ public class RecoverySourceHandler {
"[{}], local [{}]",
indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote",
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exist in remote",
indexName, shardId, request.targetNode(), md.name());
}
response.phase1FileNames.add(md.name());
@ -329,8 +385,6 @@ public class RecoverySourceHandler {
}
}
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
} catch (Exception e) {
@ -340,14 +394,12 @@ public class RecoverySourceHandler {
}
}
protected void prepareTargetForTranslog(final int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp));
stopWatch.stop();
@ -357,31 +409,34 @@ public class RecoverySourceHandler {
}
/**
* Perform phase2 of the recovery process
* Perform phase two of the recovery process.
* <p>
* Phase2 takes a snapshot of the current translog *without* acquiring the
* write lock (however, the translog snapshot is a point-in-time view of
* the translog). It then sends each translog operation to the target node
* so it can be replayed into the new shard.
* Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
* shard.
*
* @param snapshot a snapshot of the translog
*/
public void phase2(Translog.Snapshot snapshot) {
void phase2(final Translog.Snapshot snapshot) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
StopWatch stopWatch = new StopWatch().start();
final StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
// Send all the snapshot's translog operations to the target
int totalOperations = sendSnapshot(snapshot);
// send all the snapshot's translog operations to the target
final int totalOperations = sendSnapshot(request.startingSeqNo(), snapshot);
stopWatch.stop();
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
}
/**
/*
* finalizes the recovery process
*/
public void finalizeRecovery() {
@ -410,7 +465,7 @@ public class RecoverySourceHandler {
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
}
/**
/*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
@ -421,77 +476,73 @@ public class RecoverySourceHandler {
}
/**
* Send the given snapshot's operations to this handler's target node.
* Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's
* target node.
* <p>
* Operations are bulked into a single request depending on an operation
* count limit or size-in-bytes limit
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the total number of translog operations that were sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected int sendSnapshot(final Translog.Snapshot snapshot) {
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
int ops = 0;
long size = 0;
int totalOperations = 0;
final List<Translog.Operation> operations = new ArrayList<>();
Translog.Operation operation;
try {
operation = snapshot.next(); // this ex should bubble up
} catch (IOException ex) {
throw new ElasticsearchException("failed to get next operation from translog", ex);
if (snapshot.totalOperations() == 0) {
logger.trace("[{}][{}] no translog operations to send to {}", indexName, shardId, request.targetNode());
}
if (operation == null) {
logger.trace("[{}][{}] no translog operations to send to {}",
indexName, shardId, request.targetNode());
}
while (operation != null) {
// send operations in batches
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
operations.add(operation);
ops += 1;
ops++;
size += operation.estimateSize();
totalOperations++;
// Check if this request is past bytes threshold, and
// if so, send it off
// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
// don't throttle translog, since we lock for phase3 indexing,
// so we need to move it as fast as possible. Note, since we
// index docs to replicas while the index files are recovered
// the lock can potentially be removed, in which case, it might
// make sense to re-enable throttling in this phase
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
snapshot.totalOperations(),
request.targetNode());
indexName,
shardId,
ops,
new ByteSizeValue(size),
snapshot.totalOperations(),
request.targetNode());
}
ops = 0;
size = 0;
operations.clear();
}
try {
operation = snapshot.next(); // this ex should bubble up
} catch (IOException ex) {
throw new ElasticsearchException("failed to get next operation from translog", ex);
}
}
// send the leftover
// send the leftover operations
if (!operations.isEmpty()) {
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
}
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent final batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
snapshot.totalOperations(),
request.targetNode());
indexName,
shardId,
ops,
new ByteSizeValue(size),
snapshot.totalOperations(),
request.targetNode());
}
return totalOperations;
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@ -97,17 +98,19 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
/**
* creates a new recovery target object that represents a recovery to the provided indexShard
* Creates a new recovery target object that represents a recovery to the provided shard.
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed / failed
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
* version. Necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler}).
* version; necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler})
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
Callback<Long> ensureClusterStateVersionCallback) {
public RecoveryTarget(final IndexShard indexShard,
final DiscoveryNode sourceNode,
final PeerRecoveryTargetService.RecoveryListener listener,
final Callback<Long> ensureClusterStateVersionCallback) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
@ -125,10 +128,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
/**
* returns a fresh RecoveryTarget to retry recovery from the same source node onto the same IndexShard and using the same listener
* Returns a fresh recovery target to retry recovery from the same source node onto the same shard and using the same listener.
*
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(this.indexShard, this.sourceNode, this.listener, this.ensureClusterStateVersionCallback);
return new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
}
public long recoveryId() {
@ -152,7 +157,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return indexShard.recoveryState();
}
public CancellableThreads CancellableThreads() {
public CancellableThreads cancellableThreads() {
return cancellableThreads;
}
@ -220,7 +225,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
* <p>
* if {@link #CancellableThreads()} was used, the threads will be interrupted.
* if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
@ -29,17 +30,16 @@ import java.util.function.Function;
import java.util.function.Supplier;
/**
* A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
* as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock.
* A recovery handler that skips phase one as well as sending the translog snapshot.
*/
public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries, Logger logger) {
SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries, Logger logger) {
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, logger);
this.shard = shard;
this.request = request;
@ -49,8 +49,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
public RecoveryResponse recoverToTarget() throws IOException {
boolean engineClosed = false;
try {
logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
logger.trace("{} recovery [phase1] to {}: skipping phase1 for shared filesystem", request.shardId(), request.targetNode());
final long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
if (request.isPrimaryRelocation()) {
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
try {
@ -83,9 +83,9 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
}
@Override
protected int sendSnapshot(Translog.Snapshot snapshot) {
logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}",
shard.shardId(), request.targetNode());
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) {
logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", shard.shardId(), request.targetNode());
return 0;
}
}

View File

@ -19,46 +19,60 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
/**
* Represents a request for starting a peer recovery.
*/
public class StartRecoveryRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private Store.MetadataSnapshot metadataSnapshot;
private boolean primaryRelocation;
private long startingSeqNo;
public StartRecoveryRequest() {
}
/**
* Start recovery request.
* Construct a request for starting a peer recovery.
*
* @param sourceNode The node to recover from
* @param targetNode The node to recover to
* @param shardId the shard ID to recover
* @param sourceNode the source node to remover from
* @param targetNode the target node to recover to
* @param metadataSnapshot the Lucene metadata
* @param primaryRelocation whether or not the recovery is a primary relocation
* @param recoveryId the recovery ID
* @param startingSeqNo the starting sequence number
*/
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Store.MetadataSnapshot metadataSnapshot, boolean primaryRelocation, long recoveryId) {
public StartRecoveryRequest(final ShardId shardId,
final DiscoveryNode sourceNode,
final DiscoveryNode targetNode,
final Store.MetadataSnapshot metadataSnapshot,
final boolean primaryRelocation,
final long recoveryId,
final long startingSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.metadataSnapshot = metadataSnapshot;
this.primaryRelocation = primaryRelocation;
this.startingSeqNo = startingSeqNo;
}
public long recoveryId() {
@ -85,6 +99,10 @@ public class StartRecoveryRequest extends TransportRequest {
return metadataSnapshot;
}
public long startingSeqNo() {
return startingSeqNo;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -94,6 +112,11 @@ public class StartRecoveryRequest extends TransportRequest {
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
primaryRelocation = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
startingSeqNo = in.readLong();
} else {
startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
@Override
@ -105,6 +128,9 @@ public class StartRecoveryRequest extends TransportRequest {
targetNode.writeTo(out);
metadataSnapshot.writeTo(out);
out.writeBoolean(primaryRelocation);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeLong(startingSeqNo);
}
}
}

View File

@ -26,8 +26,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -45,7 +43,6 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
import java.nio.file.DirectoryStream;
@ -55,7 +52,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -384,105 +383,92 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue());
}
public void testReusePeerRecovery() throws Exception {
final Settings settings = Settings.builder()
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
.put("gateway.recover_after_nodes", 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
.put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
public void testReuseInFileBasedPeerRecovery() throws Exception {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(nodeSettings(0));
internalCluster().startNodes(4, settings);
// prevent any rebalance actions during the peer recovery
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
// we reuse the files on disk after full restarts for replicas.
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(indexSettings())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
ensureGreen();
logger.info("--> indexing docs");
for (int i = 0; i < 1000; i++) {
client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
if ((i % 200) == 0) {
client().admin().indices().prepareFlush().execute().actionGet();
}
}
if (randomBoolean()) {
client().admin().indices().prepareFlush().execute().actionGet();
}
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setForce(true).get();
boolean useSyncIds = randomBoolean();
if (useSyncIds == false) {
logger.info("--> disabling allocation while the cluster is shut down");
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
.get();
logger.info("--> full cluster restart");
internalCluster().fullRestart();
logger.info("--> waiting for cluster to return to green after first shutdown");
ensureGreen();
} else {
logger.info("--> trying to sync flush");
assertEquals(client().admin().indices().prepareSyncedFlush("test").get().failedShards(), 0);
assertSyncIdsNotNull();
}
logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time");
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
// create the index with our mapping
client(primaryNode)
.admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 1))
.get();
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
logger.info("--> full cluster restart");
internalCluster().fullRestart();
logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second ");
ensureGreen();
primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
if (useSyncIds) {
assertSyncIdsNotNull();
logger.info("--> indexing docs");
for (int i = 0; i < randomIntBetween(1, 1024); i++) {
client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
}
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
// start the replica node; we do this after indexing so a file-based recovery is triggered to ensure the files are identical
final String replicaNode = internalCluster().startDataOnlyNode(nodeSettings(1));
ensureGreen();
final RecoveryResponse initialRecoveryReponse = client().admin().indices().prepareRecoveries("test").get();
final Set<String> files = new HashSet<>();
for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
for (final RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
files.add(file.name());
}
break;
}
}
logger.info("--> restart replica node");
internalCluster().restartNode(replicaNode, new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
// index some more documents; we expect to reuse the files that already exist on the replica
for (int i = 0; i < randomIntBetween(1, 1024); i++) {
client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
}
// prevent a sequence-number-based recovery from being possible
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
return super.onNodeStopped(nodeName);
}
});
ensureGreen();
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (final RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
long recovered = 0;
for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
if (file.name().startsWith("segments")) {
long reused = 0;
int filesRecovered = 0;
int filesReused = 0;
for (final RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
if (files.contains(file.name()) == false) {
recovered += file.length();
filesRecovered++;
} else {
reused += file.length();
filesReused++;
}
}
if (!recoveryState.getPrimary() && (useSyncIds == false)) {
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
// we have to recover the segments file since we commit the translog ID on engine startup
assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(1));
assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - 1));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
} else {
if (useSyncIds && !recoveryState.getPrimary()) {
logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
}
if (recoveryState.getPrimary()) {
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
} else {
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("bytes should have been recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
// we have to recover the segments file since we commit the translog ID on engine startup
assertThat("all existing files should be reused, byte count mismatch", recoveryState.getIndex().reusedBytes(), equalTo(reused));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
assertThat("the segment from the last round of indexing should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(filesRecovered));
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
}
}
}

View File

@ -110,6 +110,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
@ -648,14 +649,14 @@ public class InternalEngineTests extends ESTestCase {
assertThat(stats1.getGeneration(), greaterThan(0L));
assertThat(stats1.getId(), notNullValue());
assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY));
assertThat(stats1.getUserData(), hasKey(InternalEngine.LOCAL_CHECKPOINT_KEY));
assertThat(stats1.getUserData(), hasKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
Long.parseLong(stats1.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(stats1.getUserData(), hasKey(SequenceNumbers.MAX_SEQ_NO));
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
Long.parseLong(stats1.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
@ -677,9 +678,9 @@ public class InternalEngineTests extends ESTestCase {
stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY),
not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY)));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(SequenceNumbers.MAX_SEQ_NO));
assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
} finally {
IOUtils.close(engine);
}
@ -1772,14 +1773,14 @@ public class InternalEngineTests extends ESTestCase {
assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint));
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(localCheckpoint));
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
assertThat(
initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
equalTo(maxSeqNo));
} finally {
@ -1793,13 +1794,13 @@ public class InternalEngineTests extends ESTestCase {
assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo());
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
assertThat(
recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert
// that the committed max seq no is equivalent to what the current primary seq no is, as all data
// we have assigned sequence numbers to should be in the commit
@ -1861,11 +1862,11 @@ public class InternalEngineTests extends ESTestCase {
long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) {
Map<String, String> userData = commit.getUserData();
long localCheckpoint = userData.containsKey(InternalEngine.LOCAL_CHECKPOINT_KEY) ?
Long.parseLong(userData.get(InternalEngine.LOCAL_CHECKPOINT_KEY)) :
long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ?
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) :
SequenceNumbersService.NO_OPS_PERFORMED;
long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ?
Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) :
long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ?
Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) :
SequenceNumbersService.UNASSIGNED_SEQ_NO;
// local checkpoint and max seq no shouldn't go backwards
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint));

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.replication;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
@ -35,8 +36,11 @@ import org.elasticsearch.action.support.replication.TransportWriteActionTestHelp
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
@ -46,6 +50,7 @@ import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
@ -169,14 +174,43 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
updateAllocationIDsOnPrimary();
}
private final Runnable replicaGlobalCheckpointSyncer = () -> {
throw new AssertionError("replicas can not sync global checkpoint");
};
public synchronized IndexShard addReplica() throws IOException {
final IndexShard replica = newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData,
() -> { throw new AssertionError("replicas can't sync global checkpoint"); }, null);
final IndexShard replica =
newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, replicaGlobalCheckpointSyncer, null);
replicas.add(replica);
updateAllocationIDsOnPrimary();
return replica;
}
public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
nodeId,
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, replicaGlobalCheckpointSyncer);
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
}
public synchronized List<IndexShard> getReplicas() {
return Collections.unmodifiableList(replicas);
}
synchronized boolean removeReplica(IndexShard replica) {
final boolean removed = replicas.remove(replica);
if (removed) {
updateAllocationIDsOnPrimary();
}
return removed;
}
public void recoverReplica(IndexShard replica) throws IOException {
recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {}));
}
@ -186,8 +220,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
recoverReplica(replica, targetSupplier, true);
}
public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
public void recoverReplica(
IndexShard replica,
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
updateAllocationIDsOnPrimary();
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.index.replication;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
@ -34,6 +36,10 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase {
public void testIndexingDuringFileRecovery() throws Exception {
@ -57,11 +63,77 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
public void testRecoveryOfDisconnectedReplica() throws Exception {
try (final ReplicationGroup shards = createGroup(1)) {
shards.startAll();
int docs = shards.indexDocs(randomInt(50));
shards.flush();
shards.getPrimary().updateGlobalCheckpointOnPrimary();
final IndexShard originalReplica = shards.getReplicas().get(0);
long replicaCommittedLocalCheckpoint = docs - 1;
boolean replicaHasDocsSinceLastFlushedCheckpoint = false;
for (int i = 0; i < randomInt(2); i++) {
final int indexedDocs = shards.indexDocs(randomInt(5));
docs += indexedDocs;
if (indexedDocs > 0) {
replicaHasDocsSinceLastFlushedCheckpoint = true;
}
final boolean flush = randomBoolean();
if (flush) {
originalReplica.flush(new FlushRequest());
replicaHasDocsSinceLastFlushedCheckpoint = false;
replicaCommittedLocalCheckpoint = docs - 1;
}
final boolean sync = randomBoolean();
if (sync) {
shards.getPrimary().updateGlobalCheckpointOnPrimary();
}
}
shards.removeReplica(originalReplica);
final int missingOnReplica = shards.indexDocs(randomInt(5));
docs += missingOnReplica;
replicaHasDocsSinceLastFlushedCheckpoint |= missingOnReplica > 0;
if (randomBoolean()) {
shards.getPrimary().updateGlobalCheckpointOnPrimary();
}
final boolean flushPrimary = randomBoolean();
if (flushPrimary) {
shards.flush();
}
originalReplica.close("disconnected", false);
IOUtils.close(originalReplica.store());
final IndexShard recoveredReplica =
shards.addReplicaWithExistingPath(originalReplica.shardPath(), originalReplica.routingEntry().currentNodeId());
shards.recoverReplica(recoveredReplica);
if (flushPrimary && replicaHasDocsSinceLastFlushedCheckpoint) {
// replica has something to catch up with, but since we flushed the primary, we should fall back to full recovery
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty()));
} else {
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
assertThat(
recoveredReplica.recoveryState().getTranslog().recoveredOperations(),
equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1))));
}
docs += shards.indexDocs(randomInt(5));
shards.assertAllEqual(docs);
}
}
private static class BlockingTarget extends RecoveryTarget {
private final CountDownLatch recoveryBlocked;
private final CountDownLatch releaseRecovery;
private final RecoveryState.Stage stageToBlock;
public static final EnumSet<RecoveryState.Stage> SUPPORTED_STAGES =
static final EnumSet<RecoveryState.Stage> SUPPORTED_STAGES =
EnumSet.of(RecoveryState.Stage.INDEX, RecoveryState.Stage.TRANSLOG, RecoveryState.Stage.FINALIZE);
private final Logger logger;
@ -119,4 +191,5 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
}

View File

@ -42,19 +42,22 @@ import static org.hamcrest.Matchers.not;
public class GlobalCheckpointTests extends ESTestCase {
GlobalCheckpointService checkpointService;
GlobalCheckpointTracker tracker;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO);
tracker =
new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
}
public void testEmptyShards() {
assertFalse("checkpoint shouldn't be updated when the are no active shards", checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertFalse("checkpoint shouldn't be updated when the are no active shards", tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}
private final AtomicInteger aIdGenerator = new AtomicInteger();
@ -81,7 +84,7 @@ public class GlobalCheckpointTests extends ESTestCase {
// it is however nice not to assume this on this level and check we do the right thing.
final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
allocations.keySet().forEach(aId -> {
@ -96,42 +99,42 @@ public class GlobalCheckpointTests extends ESTestCase {
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
checkpointService.updateAllocationIdsFromMaster(active, initializing);
initializing.forEach(aId -> checkpointService.markAllocationIdAsInSync(aId));
allocations.keySet().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
tracker.updateAllocationIdsFromMaster(active, initializing);
initializing.forEach(aId -> tracker.markAllocationIdAsInSync(aId));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(checkpointService.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
assertThat(tracker.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// increment checkpoints
active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final String extraId = "extra_" + randomAsciiOfLength(5);
// first check that adding it without the master blessing doesn't change anything.
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(checkpointService.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
checkpointService.updateAllocationIdsFromMaster(newActive, initializing);
tracker.updateAllocationIdsFromMaster(newActive, initializing);
// we should ask for a refresh , but not update the checkpoint
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// now notify for the new id
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
// now it should be incremented
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), greaterThan(maxLocalCheckpoint));
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), greaterThan(maxLocalCheckpoint));
}
public void testMissingActiveIdsPreventAdvance() {
@ -140,60 +143,60 @@ public class GlobalCheckpointTests extends ESTestCase {
final Map<String, Long> assigned = new HashMap<>();
assigned.putAll(active);
assigned.putAll(initializing);
checkpointService.updateAllocationIdsFromMaster(
tracker.updateAllocationIdsFromMaster(
new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())),
initializing.keySet());
randomSubsetOf(initializing.keySet()).forEach(checkpointService::markAllocationIdAsInSync);
assigned.forEach(checkpointService::updateLocalCheckpoint);
randomSubsetOf(initializing.keySet()).forEach(tracker::markAllocationIdAsInSync);
assigned.forEach(tracker::updateLocalCheckpoint);
// now mark all active shards
checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
// global checkpoint can't be advanced, but we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
assigned.forEach(checkpointService::updateLocalCheckpoint);
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
assigned.forEach(tracker::updateLocalCheckpoint);
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testMissingInSyncIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(checkpointService::markAllocationIdAsInSync);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(tracker::markAllocationIdAsInSync);
randomSubsetOf(randomInt(initializing.size() - 1),
initializing.keySet()).forEach(aId -> checkpointService.updateLocalCheckpoint(aId, initializing.get(aId)));
initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId)));
active.forEach(checkpointService::updateLocalCheckpoint);
active.forEach(tracker::updateLocalCheckpoint);
// global checkpoint can't be advanced, but we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
initializing.forEach(checkpointService::updateLocalCheckpoint);
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
initializing.forEach(tracker::updateLocalCheckpoint);
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(checkpointService::markAllocationIdAsInSync);
nonApproved.keySet().forEach(checkpointService::markAllocationIdAsInSync);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(tracker::markAllocationIdAsInSync);
nonApproved.keySet().forEach(tracker::markAllocationIdAsInSync);
List<Map<String, Long>> allocations = Arrays.asList(active, initializing, nonApproved);
Collections.shuffle(allocations, random());
allocations.forEach(a -> a.forEach(checkpointService::updateLocalCheckpoint));
allocations.forEach(a -> a.forEach(tracker::updateLocalCheckpoint));
// global checkpoint can be advanced, but we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
@ -212,33 +215,33 @@ public class GlobalCheckpointTests extends ESTestCase {
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
checkpointService.updateAllocationIdsFromMaster(active, initializing);
tracker.updateAllocationIdsFromMaster(active, initializing);
if (randomBoolean()) {
initializingToStay.keySet().forEach(checkpointService::markAllocationIdAsInSync);
initializingToStay.keySet().forEach(tracker::markAllocationIdAsInSync);
} else {
initializing.forEach(checkpointService::markAllocationIdAsInSync);
initializing.forEach(tracker::markAllocationIdAsInSync);
}
if (randomBoolean()) {
allocations.forEach(checkpointService::updateLocalCheckpoint);
allocations.forEach(tracker::updateLocalCheckpoint);
}
// global checkpoint may be advanced, but we need a sync in any case
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertTrue(tracker.updateCheckpointOnPrimary());
// now remove shards
if (randomBoolean()) {
checkpointService.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> checkpointService.updateLocalCheckpoint(aid, ckp + 10L));
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> checkpointService.updateLocalCheckpoint(aid, ckp + 10L));
checkpointService.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
.min(Long::compare).get() + 10; // we added 10 to make sure it's advanced in the second time
// global checkpoint is advanced and we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(checkpoint));
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(checkpoint));
}
}

View File

@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before;
@ -31,16 +32,18 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
public class LocalCheckpointServiceTests extends ESTestCase {
public class LocalCheckpointTrackerTests extends ESTestCase {
private LocalCheckpointService checkpointService;
private LocalCheckpointTracker tracker;
private final int SMALL_CHUNK_SIZE = 4;
@ -48,45 +51,47 @@ public class LocalCheckpointServiceTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
checkpointService = getCheckpointService();
tracker = getTracker();
}
private LocalCheckpointService getCheckpointService() {
return new LocalCheckpointService(
new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test",
Settings.builder()
.put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED);
private LocalCheckpointTracker getTracker() {
return new LocalCheckpointTracker(
IndexSettingsModule.newIndexSettings(
"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED
);
}
public void testSimplePrimary() {
long seqNo1, seqNo2;
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
seqNo1 = checkpointService.generateSeqNo();
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
checkpointService.markSeqNoAsCompleted(seqNo1);
assertThat(checkpointService.getCheckpoint(), equalTo(0L));
seqNo1 = checkpointService.generateSeqNo();
seqNo2 = checkpointService.generateSeqNo();
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(0L));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(1L));
assertThat(seqNo2, equalTo(2L));
checkpointService.markSeqNoAsCompleted(seqNo2);
assertThat(checkpointService.getCheckpoint(), equalTo(0L));
checkpointService.markSeqNoAsCompleted(seqNo1);
assertThat(checkpointService.getCheckpoint(), equalTo(2L));
tracker.markSeqNoAsCompleted(seqNo2);
assertThat(tracker.getCheckpoint(), equalTo(0L));
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(2L));
}
public void testSimpleReplica() {
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
checkpointService.markSeqNoAsCompleted(0L);
assertThat(checkpointService.getCheckpoint(), equalTo(0L));
checkpointService.markSeqNoAsCompleted(2L);
assertThat(checkpointService.getCheckpoint(), equalTo(0L));
checkpointService.markSeqNoAsCompleted(1L);
assertThat(checkpointService.getCheckpoint(), equalTo(2L));
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
tracker.markSeqNoAsCompleted(0L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
tracker.markSeqNoAsCompleted(2L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
tracker.markSeqNoAsCompleted(1L);
assertThat(tracker.getCheckpoint(), equalTo(2L));
}
public void testSimpleOverFlow() {
@ -99,11 +104,11 @@ public class LocalCheckpointServiceTests extends ESTestCase {
}
Collections.shuffle(seqNoList, random());
for (Integer seqNo : seqNoList) {
checkpointService.markSeqNoAsCompleted(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
}
assertThat(checkpointService.checkpoint, equalTo(maxOps - 1L));
assertThat(checkpointService.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
}
public void testConcurrentPrimary() throws InterruptedException {
@ -125,10 +130,10 @@ public class LocalCheckpointServiceTests extends ESTestCase {
protected void doRun() throws Exception {
barrier.await();
for (int i = 0; i < opsPerThread; i++) {
long seqNo = checkpointService.generateSeqNo();
long seqNo = tracker.generateSeqNo();
logger.info("[t{}] started [{}]", threadId, seqNo);
if (seqNo != unFinishedSeq) {
checkpointService.markSeqNoAsCompleted(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
logger.info("[t{}] completed [{}]", threadId, seqNo);
}
}
@ -139,12 +144,12 @@ public class LocalCheckpointServiceTests extends ESTestCase {
for (Thread thread : threads) {
thread.join();
}
assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L));
checkpointService.markSeqNoAsCompleted(unFinishedSeq);
assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(checkpointService.processedSeqNo.size(), isOneOf(0, 1));
assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
}
public void testConcurrentReplica() throws InterruptedException {
@ -177,7 +182,7 @@ public class LocalCheckpointServiceTests extends ESTestCase {
Integer[] ops = seqNoPerThread[threadId];
for (int seqNo : ops) {
if (seqNo != unFinishedSeq) {
checkpointService.markSeqNoAsCompleted(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
logger.info("[t{}] completed [{}]", threadId, seqNo);
}
}
@ -188,11 +193,48 @@ public class LocalCheckpointServiceTests extends ESTestCase {
for (Thread thread : threads) {
thread.join();
}
assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L));
checkpointService.markSeqNoAsCompleted(unFinishedSeq);
assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
}
public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {
final int seqNo = randomIntBetween(0, 32);
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean complete = new AtomicBoolean();
final Thread thread = new Thread(() -> {
try {
// sychronize starting with the test thread
barrier.await();
tracker.waitForOpsToComplete(seqNo);
complete.set(true);
// synchronize with the test thread checking if we are no longer waiting
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.start();
// synchronize starting with the waiting thread
barrier.await();
final List<Integer> elements = IntStream.rangeClosed(0, seqNo).boxed().collect(Collectors.toList());
Randomness.shuffle(elements);
for (int i = 0; i < elements.size() - 1; i++) {
tracker.markSeqNoAsCompleted(elements.get(i));
assertFalse(complete.get());
}
tracker.markSeqNoAsCompleted(elements.get(elements.size() - 1));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
thread.join();
}
}

View File

@ -0,0 +1,236 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Tokenizer;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalyzerProvider;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class EvilPeerRecoveryIT extends ESIntegTestCase {
private static AtomicReference<CountDownLatch> indexLatch = new AtomicReference<>();
private static AtomicReference<CountDownLatch> waitForOpsToCompleteLatch = new AtomicReference<>();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(LatchAnalysisPlugin.class);
}
public static class LatchAnalysisPlugin extends Plugin implements AnalysisPlugin {
@Override
public Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> getAnalyzers() {
return Collections.singletonMap("latch_analyzer", (a, b, c, d) -> new LatchAnalyzerProvider());
}
}
static class LatchAnalyzerProvider implements AnalyzerProvider<LatchAnalyzer> {
@Override
public String name() {
return "latch_analyzer";
}
@Override
public AnalyzerScope scope() {
return AnalyzerScope.INDICES;
}
@Override
public LatchAnalyzer get() {
return new LatchAnalyzer();
}
}
static class LatchAnalyzer extends Analyzer {
@Override
protected TokenStreamComponents createComponents(final String fieldName) {
return new TokenStreamComponents(new LatchTokenizer());
}
}
static class LatchTokenizer extends Tokenizer {
@Override
public final boolean incrementToken() throws IOException {
try {
if (indexLatch.get() != null) {
// latch that all exected operations are in the engine
indexLatch.get().countDown();
}
if (waitForOpsToCompleteLatch.get() != null) {
// latch that waits for the replica to restart and allows recovery to proceed
waitForOpsToCompleteLatch.get().await();
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
}
/*
* This tests that sequence-number-based recoveries wait for in-flight operations to complete. The trick here is simple. We latch some
* in-flight operations inside the engine after sequence numbers are assigned. While these operations are latched, we restart a replica.
* Sequence-number-based recovery on this replica has to wait until these in-flight operations complete to proceed. We verify at the end
* of recovery that a file-based recovery was not completed, and that the expected number of operations was replayed via the translog.
*/
public void testRecoveryWaitsForOps() throws Exception {
final int docs = randomIntBetween(1, 64);
try {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(nodeSettings(0));
// prepare mapping that uses our latch analyzer
final XContentBuilder mapping = jsonBuilder();
mapping.startObject();
{
mapping.startObject("type");
{
mapping.startObject("properties");
{
mapping.startObject("foo");
{
mapping.field("type", "text");
mapping.field("analyzer", "latch_analyzer");
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
}
// create the index with our mapping
client()
.admin()
.indices()
.prepareCreate("index")
.addMapping("type", mapping)
.setSettings(Settings.builder().put("number_of_shards", 1))
.get();
// start the replica node; we do this after creating the index so we can control which node is holds the primary shard
final String replicaNode = internalCluster().startDataOnlyNode(nodeSettings(1));
ensureGreen();
// index some documents so that the replica will attempt a sequence-number-based recovery upon restart
for (int foo = 0; foo < docs; foo++) {
index(randomFrom(primaryNode, replicaNode), foo);
}
if (randomBoolean()) {
client().admin().indices().flush(new FlushRequest()).get();
}
// start some in-flight operations that will get latched in the engine
final List<Thread> threads = new ArrayList<>();
final int latchedDocs = internalCluster().getInstance(ThreadPool.class, replicaNode).info(ThreadPool.Names.BULK).getMax();
indexLatch.set(new CountDownLatch(latchedDocs));
waitForOpsToCompleteLatch.set(new CountDownLatch(1));
for (int i = docs; i < docs + latchedDocs; i++) {
final int foo = i;
// we have to index through the primary since we are going to restart the replica
final Thread thread = new Thread(() -> index(primaryNode, foo));
threads.add(thread);
thread.start();
}
// latch until all operations are inside the engine
indexLatch.get().await();
internalCluster().restartNode(replicaNode, new InternalTestCluster.RestartCallback());
final Index index = resolveIndex("index");
// wait until recovery starts
assertBusy(() -> {
final IndicesService primaryService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertThat(primaryService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
final IndicesService replicaService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertThat(replicaService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
}
);
// unlatch the operations that are latched inside the engine
waitForOpsToCompleteLatch.get().countDown();
for (final Thread thread : threads) {
thread.join();
}
// recovery should complete successfully
ensureGreen();
// verify that a sequence-number-based recovery was completed
final org.elasticsearch.action.admin.indices.recovery.RecoveryResponse response =
client().admin().indices().prepareRecoveries("index").get();
final List<RecoveryState> states = response.shardRecoveryStates().get("index");
for (final RecoveryState state : states) {
if (state.getTargetNode().getName().equals(replicaNode)) {
assertThat(state.getTranslog().recoveredOperations(), equalTo(latchedDocs));
assertThat(state.getIndex().recoveredFilesPercent(), equalTo(0f));
}
}
} finally {
internalCluster().close();
}
}
private void index(final String node, final int foo) {
client(node).prepareIndex("index", "type").setSource("{\"foo\":\"" + Integer.toString(foo) + "\"}").get();
}
}

View File

@ -44,7 +44,7 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
private static final int RELOCATION_COUNT = 25;
@TestLogging("_root:DEBUG,org.elasticsearch.action.delete:TRACE,org.elasticsearch.action.index:TRACE,index.shard:TRACE,org.elasticsearch.cluster.service:TRACE")
@TestLogging("_root:DEBUG,org.elasticsearch.action.delete:TRACE,org.elasticsearch.action.index:TRACE,org.elasticsearch.index.shard:TRACE,cluster.service:TRACE")
public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client().admin().indices().prepareCreate("test")
@ -97,4 +97,5 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
ElasticsearchAssertions.assertHitCount(client().prepareSearch("test")// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true)).get(), numAutoGenDocs.get());
}
}

View File

@ -41,6 +41,9 @@ import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
@ -65,7 +68,9 @@ import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -79,10 +84,14 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null, randomBoolean(), randomLong());
final StartRecoveryRequest request = new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger);
@ -131,10 +140,15 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null, randomBoolean(), randomLong());
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : 0L);
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
@ -195,10 +209,15 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null, randomBoolean(), randomLong());
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : 0L);
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
@ -254,48 +273,88 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Completed() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null, false, randomLong());
IndexShard shard = mock(IndexShard.class);
Translog.View translogView = mock(Translog.View.class);
final boolean attemptSequenceNumberBasedRecovery = randomBoolean();
final boolean isTranslogReadyForSequenceNumberBasedRecovery = attemptSequenceNumberBasedRecovery && randomBoolean();
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
false,
randomNonNegativeLong(),
attemptSequenceNumberBasedRecovery ? randomNonNegativeLong() : SequenceNumbersService.UNASSIGNED_SEQ_NO);
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
final Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
AtomicBoolean phase1Called = new AtomicBoolean();
AtomicBoolean phase2Called = new AtomicBoolean();
RecoverySourceHandler handler = new RecoverySourceHandler(shard, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final RecoverySourceHandler handler = new RecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
request,
() -> 0L,
e -> () -> {
},
recoverySettings.getChunkSize().bytesAsInt(),
logger) {
@Override
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) {
return isTranslogReadyForSequenceNumberBasedRecovery;
}
@Override
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
phase1Called.set(true);
}
@Override
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
prepareTargetForTranslogCalled.set(true);
}
@Override
public void phase2(Translog.Snapshot snapshot) {
phase2Called.set(true);
}
};
expectThrows(IndexShardRelocatedException.class, () -> handler.recoverToTarget());
assertTrue(phase1Called.get());
expectThrows(IndexShardRelocatedException.class, handler::recoverToTarget);
// phase1 should only be attempted if we are not doing a sequence-number-based recovery
assertThat(phase1Called.get(), equalTo(!isTranslogReadyForSequenceNumberBasedRecovery));
assertTrue(prepareTargetForTranslogCalled.get());
assertFalse(phase2Called.get());
}
public void testWaitForClusterStateOnPrimaryRelocation() throws IOException, InterruptedException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null, true, randomLong());
AtomicBoolean phase1Called = new AtomicBoolean();
AtomicBoolean phase2Called = new AtomicBoolean();
AtomicBoolean ensureClusterStateVersionCalled = new AtomicBoolean();
AtomicBoolean recoveriesDelayed = new AtomicBoolean();
AtomicBoolean relocated = new AtomicBoolean();
final boolean attemptSequenceNumberBasedRecovery = randomBoolean();
final boolean isTranslogReadyForSequenceNumberBasedRecovery = attemptSequenceNumberBasedRecovery && randomBoolean();
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
true,
randomNonNegativeLong(),
attemptSequenceNumberBasedRecovery ? randomNonNegativeLong(): SequenceNumbersService.UNASSIGNED_SEQ_NO);
final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final AtomicBoolean ensureClusterStateVersionCalled = new AtomicBoolean();
final AtomicBoolean recoveriesDelayed = new AtomicBoolean();
final AtomicBoolean relocated = new AtomicBoolean();
IndexShard shard = mock(IndexShard.class);
Translog.View translogView = mock(Translog.View.class);
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
final Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).then(i -> relocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED);
doAnswer(i -> {
@ -304,16 +363,17 @@ public class RecoverySourceHandlerTests extends ESTestCase {
return null;
}).when(shard).relocated(any(String.class));
RecoveryTargetHandler targetHandler = mock(RecoveryTargetHandler.class);
final Supplier<Long> currentClusterStateVersionSupplier = () -> {
assertFalse(ensureClusterStateVersionCalled.get());
assertTrue(recoveriesDelayed.get());
ensureClusterStateVersionCalled.set(true);
return 0L;
};
final Function<String, Releasable> delayNewRecoveries = s -> {
assertTrue(phase1Called.get());
// phase1 should only be attempted if we are not doing a sequence-number-based recovery
assertThat(phase1Called.get(), equalTo(!isTranslogReadyForSequenceNumberBasedRecovery));
assertTrue(prepareTargetForTranslogCalled.get());
assertTrue(phase2Called.get());
assertFalse(recoveriesDelayed.get());
@ -324,8 +384,19 @@ public class RecoverySourceHandlerTests extends ESTestCase {
};
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, targetHandler, request, currentClusterStateVersionSupplier,
delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger) {
final RecoverySourceHandler handler = new RecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
request,
currentClusterStateVersionSupplier,
delayNewRecoveries,
recoverySettings.getChunkSize().bytesAsInt(),
logger) {
@Override
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) {
return isTranslogReadyForSequenceNumberBasedRecovery;
}
@Override
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
@ -333,13 +404,22 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
public void phase2(Translog.Snapshot snapshot) {
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
prepareTargetForTranslogCalled.set(true);
}
@Override
public void phase2(final Translog.Snapshot snapshot) {
phase2Called.set(true);
}
};
handler.recoverToTarget();
assertTrue(ensureClusterStateVersionCalled.get());
assertTrue(phase1Called.get());
// phase1 should only be attempted if we are not doing a sequence-number-based recovery
assertThat(phase1Called.get(), equalTo(!isTranslogReadyForSequenceNumberBasedRecovery));
assertTrue(prepareTargetForTranslogCalled.get());
assertTrue(phase2Called.get());
assertTrue(relocated.get());
assertFalse(recoveriesDelayed.get());

View File

@ -23,6 +23,8 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.ESTestCase;
@ -33,36 +35,44 @@ import java.io.ByteArrayOutputStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.equalTo;
public class StartRecoveryRequestTests extends ESTestCase {
public void testSerialization() throws Exception {
Version targetNodeVersion = randomVersion(random());
StartRecoveryRequest outRequest = new StartRecoveryRequest(
final Version targetNodeVersion = randomVersion(random());
final StartRecoveryRequest outRequest = new StartRecoveryRequest(
new ShardId("test", "_na_", 0),
new DiscoveryNode("a", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion),
Store.MetadataSnapshot.EMPTY,
randomBoolean(),
1L
);
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(targetNodeVersion);
outRequest.writeTo(out);
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
final ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
StartRecoveryRequest inRequest = new StartRecoveryRequest();
final StartRecoveryRequest inRequest = new StartRecoveryRequest();
inRequest.readFrom(in);
assertThat(outRequest.shardId(), equalTo(inRequest.shardId()));
assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode()));
assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode()));
assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap()));
assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId()));
assertThat(outRequest.isPrimaryRelocation(), equalTo(inRequest.isPrimaryRelocation()));
assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId()));
if (targetNodeVersion.onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
assertThat(outRequest.startingSeqNo(), equalTo(inRequest.startingSeqNo()));
} else {
assertThat(SequenceNumbersService.UNASSIGNED_SEQ_NO, equalTo(inRequest.startingSeqNo()));
}
}
}

View File

@ -129,7 +129,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
RecoveryTarget resetRecovery = collection.resetRecovery(recoveryId, TimeValue.timeValueMinutes(60));
final long resetRecoveryId = resetRecovery.recoveryId();
assertNotSame(recoveryTarget, resetRecovery);
assertNotSame(recoveryTarget.CancellableThreads(), resetRecovery.CancellableThreads());
assertNotSame(recoveryTarget.cancellableThreads(), resetRecovery.cancellableThreads());
assertSame(indexShard, resetRecovery.indexShard());
assertSame(store, resetRecovery.store());
assertEquals(referencesToStore, resetRecovery.store().refCount());

View File

@ -176,7 +176,7 @@ public class IndexingIT extends ESRestTestCase {
final Node node = nodes.getSafe(nodeId);
final SeqNoStats seqNoStats;
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max");
Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max_seq_no");
Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint");
Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint");
seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
@ -40,6 +41,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
@ -48,12 +50,15 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
@ -76,10 +81,13 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
/**
* A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
@ -377,30 +385,45 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
/**
* Recovers a replica from the give primary, allow the user to supply a custom recovery target.
* A typical usage of a custome recovery target is to assert things in the various stages of recovery
*
* @param markAsRecovering set to false if you have already marked the replica as recovering
* Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery
* target is to assert things in the various stages of recovery.
* @param replica the recovery target shard
* @param primary the recovery source shard
* @param targetSupplier supplies an instance of {@link RecoveryTarget}
* @param markAsRecovering set to {@code false} if the replica is marked as recovering
*/
protected void recoverReplica(IndexShard replica, IndexShard primary,
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering)
throws IOException {
protected final void recoverReplica(final IndexShard replica,
final IndexShard primary,
final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
final boolean markAsRecovering) throws IOException {
final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId());
final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId());
if (markAsRecovering) {
replica.markAsRecovering("remote",
new RecoveryState(replica.routingEntry(), pNode, rNode));
replica.markAsRecovering("remote", new RecoveryState(replica.routingEntry(), pNode, rNode));
} else {
assertEquals(replica.state(), IndexShardState.RECOVERING);
}
replica.prepareForIndexRecovery();
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
getMetadataSnapshotOrEmpty(replica), false, 0);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {
},
(int) ByteSizeUnit.MB.toKB(1), logger);
final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica);
final long startingSeqNo;
if (snapshot.size() > 0) {
startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget);
} else {
startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
final StartRecoveryRequest request =
new StartRecoveryRequest(replica.shardId(), pNode, rNode, snapshot, false, 0, startingSeqNo);
final RecoverySourceHandler recovery = new RecoverySourceHandler(
primary,
recoveryTarget,
request,
() -> 0L,
e -> () -> {},
(int) ByteSizeUnit.MB.toBytes(1),
logger);
recovery.recoverToTarget();
recoveryTarget.markAsDone();
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));