commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
- commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
+ commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
- commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
+ commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
similarity index 88%
rename from core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java
rename to core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
index 68eaf86f4f8..fd9b2f4687f 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
@@ -22,25 +22,27 @@ package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashSet;
+import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
/**
- * A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
- * all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
- * master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
- * These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
- * The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
+ * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
+ * equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
+ * them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
+ * have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
+ * shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
*
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
-public class GlobalCheckpointService extends AbstractIndexShardComponent {
+public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
@@ -63,14 +65,14 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
private long globalCheckpoint;
/**
- * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
- * shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
+ * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
+ * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
- * @param shardId the shard this service is tracking local checkpoints for
+ * @param shardId the shard ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
- GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
+ GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
@@ -127,8 +129,9 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
- throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
- + "] is lower than previous one [" + globalCheckpoint + "]");
+ final String message =
+ String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
+ throw new IllegalStateException(message);
}
if (globalCheckpoint != minCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minCheckpoint);
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
similarity index 75%
rename from core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java
rename to core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
index 7da833cf866..06af7a2ceca 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java
@@ -20,18 +20,17 @@
package org.elasticsearch.index.seqno;
import org.apache.lucene.util.FixedBitSet;
+import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.shard.AbstractIndexShardComponent;
-import org.elasticsearch.index.shard.ShardId;
import java.util.LinkedList;
/**
- * This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which all previous
- * sequence numbers have been processed (inclusive).
+ * This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
+ * previous sequence numbers have been processed (inclusive).
*/
-public class LocalCheckpointService extends AbstractIndexShardComponent {
+public class LocalCheckpointTracker {
/**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
@@ -67,17 +66,15 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
private volatile long nextSeqNo;
/**
- * Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
- * {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint for this
- * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
+ * Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or
+ * {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint,
+ * or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
*
- * @param shardId the shard this service is providing tracking local checkpoints for
* @param indexSettings the index settings
- * @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
- * @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
+ * @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
+ * @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
*/
- LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
- super(shardId, indexSettings);
+ public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] "
@@ -107,7 +104,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
*
* @param seqNo the sequence number to mark as completed
*/
- synchronized void markSeqNoAsCompleted(final long seqNo) {
+ public synchronized void markSeqNoAsCompleted(final long seqNo) {
// make sure we track highest seen sequence number
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
@@ -142,10 +139,25 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
return nextSeqNo - 1;
}
+ /**
+ * Waits for all operations up to the provided sequence number to complete.
+ *
+ * @param seqNo the sequence number that the checkpoint must advance to before this method returns
+ * @throws InterruptedException if the thread was interrupted while blocking on the condition
+ */
+ @SuppressForbidden(reason = "Object#wait")
+ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedException {
+ while (checkpoint < seqNo) {
+ // notified by updateCheckpoint
+ this.wait();
+ }
+ }
+
/**
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
* current checkpoint is processed.
*/
+ @SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 :
@@ -154,19 +166,24 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set";
- // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
- FixedBitSet current = processedSeqNo.getFirst();
- do {
- checkpoint++;
- // the checkpoint always falls in the first bit set or just before. If it falls
- // on the last bit of the current bit set, we can clean it.
- if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
- processedSeqNo.removeFirst();
- firstProcessedSeqNo += bitArraysSize;
- assert checkpoint - firstProcessedSeqNo < bitArraysSize;
- current = processedSeqNo.peekFirst();
- }
- } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
+ try {
+ // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
+ FixedBitSet current = processedSeqNo.getFirst();
+ do {
+ checkpoint++;
+ // the checkpoint always falls in the first bit set or just before. If it falls
+ // on the last bit of the current bit set, we can clean it.
+ if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
+ processedSeqNo.removeFirst();
+ firstProcessedSeqNo += bitArraysSize;
+ assert checkpoint - firstProcessedSeqNo < bitArraysSize;
+ current = processedSeqNo.peekFirst();
+ }
+ } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
+ } finally {
+ // notifies waiters in waitForOpsToComplete
+ this.notifyAll();
+ }
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java
index b18bfc09400..12d82f0813b 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java
@@ -30,7 +30,7 @@ import java.io.IOException;
public class SeqNoStats implements ToXContent, Writeable {
private static final String SEQ_NO = "seq_no";
- private static final String MAX_SEQ_NO = "max";
+ private static final String MAX_SEQ_NO = "max_seq_no";
private static final String LOCAL_CHECKPOINT = "local_checkpoint";
private static final String GLOBAL_CHECKPOINT = "global_checkpoint";
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java
new file mode 100644
index 00000000000..c3950e1012a
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.seqno;
+
+import java.util.Map;
+
+/**
+ * A utility class for handling sequence numbers.
+ */
+public class SequenceNumbers {
+
+ public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
+ public static final String MAX_SEQ_NO = "max_seq_no";
+
+ /**
+ * Reads the sequence number stats from the commit data (maximum sequence number and local checkpoint) and uses the specified global
+ * checkpoint.
+ *
+ * @param globalCheckpoint the global checkpoint to use
+ * @param commitData the commit data
+ * @return the sequence number stats
+ */
+ public static SeqNoStats loadSeqNoStatsFromLuceneCommit(
+ final long globalCheckpoint,
+ final Iterable> commitData) {
+ long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
+ long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
+
+ for (final Map.Entry entry : commitData) {
+ final String key = entry.getKey();
+ if (key.equals(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) {
+ assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
+ localCheckpoint = Long.parseLong(entry.getValue());
+ } else if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
+ assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : maxSeqNo;
+ maxSeqNo = Long.parseLong(entry.getValue());
+ }
+ }
+
+ return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
index 9fed5f9cf7e..a0ba1e850aa 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
@@ -40,8 +40,8 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public static final long NO_OPS_PERFORMED = -1L;
- private final LocalCheckpointService localCheckpointService;
- private final GlobalCheckpointService globalCheckpointService;
+ private final LocalCheckpointTracker localCheckpointTracker;
+ private final GlobalCheckpointTracker globalCheckpointTracker;
/**
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
@@ -62,8 +62,8 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
final long localCheckpoint,
final long globalCheckpoint) {
super(shardId, indexSettings);
- localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
- globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
+ localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint);
+ globalCheckpointTracker = new GlobalCheckpointTracker(shardId, indexSettings, globalCheckpoint);
}
/**
@@ -73,26 +73,36 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @return the next assigned sequence number
*/
public long generateSeqNo() {
- return localCheckpointService.generateSeqNo();
+ return localCheckpointTracker.generateSeqNo();
}
/**
- * The maximum sequence number issued so far. See {@link LocalCheckpointService#getMaxSeqNo()} for additional details.
+ * The maximum sequence number issued so far. See {@link LocalCheckpointTracker#getMaxSeqNo()} for additional details.
*
* @return the maximum sequence number
*/
public long getMaxSeqNo() {
- return localCheckpointService.getMaxSeqNo();
+ return localCheckpointTracker.getMaxSeqNo();
+ }
+
+ /**
+ * Waits for all operations up to the provided sequence number to complete.
+ *
+ * @param seqNo the sequence number that the checkpoint must advance to before this method returns
+ * @throws InterruptedException if the thread was interrupted while blocking on the condition
+ */
+ public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
+ localCheckpointTracker.waitForOpsToComplete(seqNo);
}
/**
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
- * See {@link LocalCheckpointService#markSeqNoAsCompleted(long)} for additional details.
+ * See {@link LocalCheckpointTracker#markSeqNoAsCompleted(long)} for additional details.
*
* @param seqNo the sequence number to mark as completed
*/
public void markSeqNoAsCompleted(final long seqNo) {
- localCheckpointService.markSeqNoAsCompleted(seqNo);
+ localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
/**
@@ -106,23 +116,23 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Notifies the service to update the local checkpoint for the shard with the provided allocation ID. See
- * {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details.
+ * {@link GlobalCheckpointTracker#updateLocalCheckpoint(String, long)} for details.
*
* @param allocationId the allocation ID of the shard to update the local checkpoint for
* @param checkpoint the local checkpoint for the shard
*/
public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) {
- globalCheckpointService.updateLocalCheckpoint(allocationId, checkpoint);
+ globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint);
}
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
- * {@link GlobalCheckpointService#markAllocationIdAsInSync(String)} for additional details.
+ * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
*/
public void markAllocationIdAsInSync(final String allocationId) {
- globalCheckpointService.markAllocationIdAsInSync(allocationId);
+ globalCheckpointTracker.markAllocationIdAsInSync(allocationId);
}
/**
@@ -131,7 +141,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @return the local checkpoint
*/
public long getLocalCheckpoint() {
- return localCheckpointService.getCheckpoint();
+ return localCheckpointTracker.getCheckpoint();
}
/**
@@ -140,7 +150,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @return the global checkpoint
*/
public long getGlobalCheckpoint() {
- return globalCheckpointService.getCheckpoint();
+ return globalCheckpointTracker.getCheckpoint();
}
/**
@@ -150,7 +160,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
- return globalCheckpointService.updateCheckpointOnPrimary();
+ return globalCheckpointTracker.updateCheckpointOnPrimary();
}
/**
@@ -159,18 +169,18 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @param checkpoint the global checkpoint
*/
public void updateGlobalCheckpointOnReplica(final long checkpoint) {
- globalCheckpointService.updateCheckpointOnReplica(checkpoint);
+ globalCheckpointTracker.updateCheckpointOnReplica(checkpoint);
}
/**
* Notifies the service of the current allocation IDs in the cluster state. See
- * {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(final Set activeAllocationIds, final Set initializingAllocationIds) {
- globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
+ globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index a7094c240f4..47ca9c0e6cf 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -100,7 +100,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
-import org.elasticsearch.index.seqno.GlobalCheckpointService;
+import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.similarity.SimilarityService;
@@ -1368,7 +1368,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service to update the local checkpoint for the shard with the provided allocation ID. See
- * {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details.
+ * {@link GlobalCheckpointTracker#updateLocalCheckpoint(String, long)} for details.
*
* @param allocationId the allocation ID of the shard to update the local checkpoint for
* @param checkpoint the local checkpoint for the shard
@@ -1378,9 +1378,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
getEngine().seqNoService().updateLocalCheckpointForShard(allocationId, checkpoint);
}
+ /**
+ * Waits for all operations up to the provided sequence number to complete.
+ *
+ * @param seqNo the sequence number that the checkpoint must advance to before this method returns
+ * @throws InterruptedException if the thread was interrupted while blocking on the condition
+ */
+ public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
+ getEngine().seqNoService().waitForOpsToComplete(seqNo);
+ }
+
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
- * {@link GlobalCheckpointService#markAllocationIdAsInSync(String)} for additional details.
+ * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
*/
@@ -1430,7 +1440,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service of the current allocation IDs in the cluster state. See
- * {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java
index fcc079d7909..ea4097c1362 100644
--- a/core/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/core/src/main/java/org/elasticsearch/index/store/Store.java
@@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
@@ -75,6 +76,8 @@ import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@@ -207,6 +210,20 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
+ /**
+ * Loads the local checkpoint and the maximum sequence number from the latest Lucene commit point and returns the triplet of local and
+ * global checkpoints, and maximum sequence number as an instance of {@link SeqNoStats}. The global checkpoint must be provided
+ * externally as it is not stored in the commit point.
+ *
+ * @param globalCheckpoint the provided global checkpoint
+ * @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number
+ * @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
+ */
+ public SeqNoStats loadSeqNoStats(final long globalCheckpoint) throws IOException {
+ final Map userData = SegmentInfos.readLatestCommit(directory).getUserData();
+ return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, userData.entrySet());
+ }
+
final void ensureOpen() {
if (this.refCounter.refCount() <= 0) {
throw new AlreadyClosedException("store is already closed");
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index fa41824f4de..0765c14cc89 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -778,6 +778,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Source getSource();
+ long seqNo();
+
/**
* Reads the type and the operation from the given stream. The operatino must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
@@ -922,6 +924,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return this.source;
}
+ @Override
public long seqNo() {
return seqNo;
}
@@ -1072,6 +1075,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return this.uid;
}
+ @Override
public long seqNo() {
return seqNo;
}
@@ -1147,6 +1151,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final long primaryTerm;
private final String reason;
+ @Override
public long seqNo() {
return seqNo;
}
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index f0d72f6c4c5..2307a711714 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -53,7 +53,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.seqno.GlobalCheckpointService;
+import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@@ -739,7 +739,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
/**
* Notifies the service of the current allocation ids in the cluster state.
- * See {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 894399e851e..23f6356ef6b 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@@ -48,6 +49,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
@@ -59,6 +61,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -124,11 +127,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
/**
- * cancel all ongoing recoveries for the given shard, if their status match a predicate
+ * Cancel all ongoing recoveries for the given shard.
*
- * @param reason reason for cancellation
- * @param shardId shardId for which to cancel recoveries
- * @return true if a recovery was cancelled
+ * @param reason reason for cancellation
+ * @param shardId shard ID for which to cancel recoveries
+ * @return {@code true} if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason);
@@ -152,7 +155,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
retryRecovery(recoveryId, retryAfter, activityTimeout);
}
- private void retryRecovery(final long recoveryId, TimeValue retryAfter, TimeValue activityTimeout) {
+ private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
if (newTarget != null) {
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId()));
@@ -166,50 +169,21 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef == null) {
- logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId);
+ logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
}
- RecoveryTarget recoveryTarget = recoveryRef.target();
- assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
-
- logger.trace("collecting local files for {}", recoveryTarget.sourceNode());
- Store.MetadataSnapshot metadataSnapshot;
- try {
- if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
- // we are not going to copy any files, so don't bother listing files, potentially running
- // into concurrency issues with the primary changing files underneath us.
- metadataSnapshot = Store.MetadataSnapshot.EMPTY;
- } else {
- metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
- }
- logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size());
- } catch (org.apache.lucene.index.IndexNotFoundException e) {
- // happens on an empty folder. no need to log
- logger.trace("{} shard folder empty, recover all files", recoveryTarget);
- metadataSnapshot = Store.MetadataSnapshot.EMPTY;
- } catch (IOException e) {
- logger.warn("error while listing local files, recover as if there are none", e);
- metadataSnapshot = Store.MetadataSnapshot.EMPTY;
- } catch (Exception e) {
- // this will be logged as warning later on...
- logger.trace("unexpected error while listing local files, failing recovery", e);
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
- new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
- return;
- }
-
+ final RecoveryTarget recoveryTarget = recoveryRef.target();
+ cancellableThreads = recoveryTarget.cancellableThreads();
+ timer = recoveryTarget.state().getTimer();
try {
+ assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
+ request = getStartRecoveryRequest(recoveryTarget);
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
recoveryTarget.indexShard().prepareForIndexRecovery();
-
- request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(),
- clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
- cancellableThreads = recoveryTarget.CancellableThreads();
- timer = recoveryTarget.state().getTimer();
- } catch (Exception e) {
+ } catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
+ onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
return;
}
@@ -227,7 +201,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
}).txGet()));
final RecoveryResponse recoveryResponse = responseHolder.get();
- assert responseHolder != null;
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
@@ -286,22 +259,23 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException ||
cause instanceof ShardNotFoundException) {
// if the target is not ready yet, retry
- retryRecovery(recoveryId, "remote shard not ready", recoverySettings.retryDelayStateSync(),
+ retryRecovery(
+ recoveryId,
+ "remote shard not ready",
+ recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}
if (cause instanceof DelayRecoveryException) {
- retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(),
- recoverySettings.activityTimeout());
+ retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout());
return;
}
if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
recoverySettings.retryDelayNetwork(), cause.getMessage());
- retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(),
- recoverySettings.activityTimeout());
+ retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout());
return;
}
@@ -310,10 +284,96 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
+
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
}
}
+ /**
+ * Obtains a snapshot of the store metadata for the recovery target.
+ *
+ * @param recoveryTarget the target of the recovery
+ * @return a snapshot of the store metdata
+ */
+ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
+ try {
+ if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
+ // we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the
+ // primary changing files underneath us
+ return Store.MetadataSnapshot.EMPTY;
+ } else {
+ return recoveryTarget.indexShard().snapshotStoreMetadata();
+ }
+ } catch (final org.apache.lucene.index.IndexNotFoundException e) {
+ // happens on an empty folder. no need to log
+ logger.trace("{} shard folder empty, recovering all files", recoveryTarget);
+ return Store.MetadataSnapshot.EMPTY;
+ } catch (final IOException e) {
+ logger.warn("error while listing local files, recovering as if there are none", e);
+ return Store.MetadataSnapshot.EMPTY;
+ }
+ }
+
+ /**
+ * Prepare the start recovery request.
+ *
+ * @param recoveryTarget the target of the recovery
+ * @return a start recovery request
+ */
+ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
+ final StartRecoveryRequest request;
+ logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
+
+ final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
+ logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
+
+ final long startingSeqNo;
+ if (metadataSnapshot.size() > 0) {
+ startingSeqNo = getStartingSeqNo(recoveryTarget);
+ } else {
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ }
+
+ if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
+ } else {
+ logger.trace(
+ "{} preparing for sequence-number-based recovery starting at local checkpoint [{}] from [{}]",
+ recoveryTarget.shardId(),
+ startingSeqNo,
+ recoveryTarget.sourceNode());
+ }
+
+ request = new StartRecoveryRequest(
+ recoveryTarget.shardId(),
+ recoveryTarget.sourceNode(),
+ clusterService.localNode(),
+ metadataSnapshot,
+ recoveryTarget.state().getPrimary(),
+ recoveryTarget.recoveryId(),
+ startingSeqNo);
+ return request;
+ }
+
+ /**
+ * Get the starting sequence number for a sequence-number-based request.
+ *
+ * @param recoveryTarget the target of the recovery
+ * @return the starting sequence number or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
+ * failed
+ */
+ public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
+ try {
+ final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.indexShard().shardPath().resolveTranslog());
+ return recoveryTarget.store().loadSeqNoStats(globalCheckpoint).getLocalCheckpoint() + 1;
+ } catch (final IOException e) {
+ // this can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
+ // translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
+ // proceeds to attempt a sequence-number-based recovery
+ return SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ }
+ }
+
public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
index aed23256108..4fb2e398e52 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
@@ -82,14 +83,13 @@ public class RecoveriesCollection {
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout));
}
-
/**
* Resets the recovery and performs a recovery restart on the currently recovering index shard
*
* @see IndexShard#performRecoveryRestart()
* @return newly created RecoveryTarget
*/
- public RecoveryTarget resetRecovery(final long recoveryId, TimeValue activityTimeout) {
+ public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activityTimeout) {
RecoveryTarget oldRecoveryTarget = null;
final RecoveryTarget newRecoveryTarget;
@@ -107,7 +107,7 @@ public class RecoveriesCollection {
}
// Closes the current recovery target
- boolean successfulReset = oldRecoveryTarget.resetRecovery(newRecoveryTarget.CancellableThreads());
+ boolean successfulReset = oldRecoveryTarget.resetRecovery(newRecoveryTarget.cancellableThreads());
if (successfulReset) {
logger.trace("{} restarted recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(),
newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId());
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 2f74bd0fbd4..5ff4ce9b09b 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -41,6 +41,8 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.RecoveryEngineException;
+import org.elasticsearch.index.seqno.LocalCheckpointTracker;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
@@ -124,47 +126,58 @@ public class RecoverySourceHandler {
* performs the recovery from the local engine to the target
*/
public RecoveryResponse recoverToTarget() throws IOException {
- try (Translog.View translogView = shard.acquireTranslogView()) {
- logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
- final IndexCommit phase1Snapshot;
- try {
- phase1Snapshot = shard.acquireIndexCommit(false);
- } catch (Exception e) {
- IOUtils.closeWhileHandlingException(translogView);
- throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
- }
+ try (final Translog.View translogView = shard.acquireTranslogView()) {
+ logger.trace("{} captured translog id [{}] for recovery", shard.shardId(), translogView.minTranslogGeneration());
- try {
- phase1(phase1Snapshot, translogView);
- } catch (Exception e) {
- throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
- } finally {
+ boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO &&
+ isTranslogReadyForSequenceNumberBasedRecovery(translogView);
+
+ if (!isSequenceNumberBasedRecoveryPossible) {
+ final IndexCommit phase1Snapshot;
try {
- shard.releaseIndexCommit(phase1Snapshot);
- } catch (IOException ex) {
- logger.warn("releasing snapshot caused exception", ex);
+ phase1Snapshot = shard.acquireIndexCommit(false);
+ } catch (final Exception e) {
+ IOUtils.closeWhileHandlingException(translogView);
+ throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
+ }
+ try {
+ phase1(phase1Snapshot, translogView);
+ } catch (final Exception e) {
+ throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
+ } finally {
+ try {
+ shard.releaseIndexCommit(phase1Snapshot);
+ } catch (final IOException ex) {
+ logger.warn("releasing snapshot caused exception", ex);
+ }
}
}
- // engine was just started at the end of phase 1
+ try {
+ prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
+ } catch (final Exception e) {
+ throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
+ }
+
+ // engine was just started at the end of phase1
if (shard.state() == IndexShardState.RELOCATED) {
assert request.isPrimaryRelocation() == false :
"recovery target should not retry primary relocation if previous attempt made it past finalization step";
- /**
+ /*
* The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all
* operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the
- * local translog and thus will be resent on phase 2. The reason is that an operation replicated by the target primary is
+ * local translog and thus will be resent on phase2. The reason is that an operation replicated by the target primary is
* sent to the recovery target and the local shard (old primary) concurrently, meaning it may have arrived at the recovery
* target before we opened the engine and is still in-flight on the local shard.
*
* Checking the relocated status here, after we opened the engine on the target, is safe because primary relocation waits
* for all ongoing operations to complete and be fully replicated. Therefore all future operation by the new primary are
- * guaranteed to reach the target shard when it's engine is open.
+ * guaranteed to reach the target shard when its engine is open.
*/
throw new IndexShardRelocatedException(request.shardId());
}
- logger.trace("{} snapshot translog for recovery. current size is [{}]", shard.shardId(), translogView.totalOperations());
+ logger.trace("{} snapshot translog for recovery; current size is [{}]", shard.shardId(), translogView.totalOperations());
try {
phase2(translogView.snapshot());
} catch (Exception e) {
@@ -176,6 +189,49 @@ public class RecoverySourceHandler {
return response;
}
+ /**
+ * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
+ * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
+ *
+ * @param translogView a view of the translog on the source
+ * @return {@code true} if the source is ready for a sequence-number-based recovery
+ * @throws IOException if an I/O exception occurred reading the translog snapshot
+ */
+ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException {
+ final long startingSeqNo = request.startingSeqNo();
+ assert startingSeqNo >= 0;
+ final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
+ logger.trace("{} starting: [{}], ending: [{}]", shard.shardId(), startingSeqNo, endingSeqNo);
+ // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
+ if (startingSeqNo - 1 <= endingSeqNo) {
+ logger.trace(
+ "{} waiting for all operations in the range [{}, {}] to complete",
+ shard.shardId(),
+ startingSeqNo,
+ endingSeqNo);
+ /*
+ * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
+ * operations in the required range will be available for replaying from the translog of the source.
+ */
+ cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
+
+ final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
+ final Translog.Snapshot snapshot = translogView.snapshot();
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ tracker.markSeqNoAsCompleted(operation.seqNo());
+ }
+ }
+ return tracker.getCheckpoint() >= endingSeqNo;
+ } else {
+ // norelease this can currently happen if a snapshot restore rolls the primary back to a previous commit point; in this
+ // situation the local checkpoint on the replica can be far in advance of the maximum sequence number on the primary violating
+ // all assumptions regarding local and global checkpoints
+ return false;
+ }
+ }
+
/**
* Perform phase1 of the recovery operations. Once this {@link IndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)
@@ -237,7 +293,7 @@ public class RecoverySourceHandler {
response.phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
- logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}]," +
+ logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exist in local store and has checksum [{}]," +
" size [{}]",
indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length());
}
@@ -252,7 +308,7 @@ public class RecoverySourceHandler {
"[{}], local [{}]",
indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
} else {
- logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote",
+ logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exist in remote",
indexName, shardId, request.targetNode(), md.name());
}
response.phase1FileNames.add(md.name());
@@ -329,8 +385,6 @@ public class RecoverySourceHandler {
}
}
- prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
-
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
} catch (Exception e) {
@@ -340,14 +394,12 @@ public class RecoverySourceHandler {
}
}
-
- protected void prepareTargetForTranslog(final int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
+ void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
final long startEngineStart = stopWatch.totalTime().millis();
- // Send a request preparing the new shard's translog to receive
- // operations. This ensures the shard engine is started and disables
- // garbage collection (not the JVM's GC!) of tombstone deletes
+ // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
+ // garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp));
stopWatch.stop();
@@ -357,31 +409,34 @@ public class RecoverySourceHandler {
}
/**
- * Perform phase2 of the recovery process
+ * Perform phase two of the recovery process.
*
- * Phase2 takes a snapshot of the current translog *without* acquiring the
- * write lock (however, the translog snapshot is a point-in-time view of
- * the translog). It then sends each translog operation to the target node
- * so it can be replayed into the new shard.
+ * Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is
+ * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
+ * shard.
+ *
+ * @param snapshot a snapshot of the translog
*/
- public void phase2(Translog.Snapshot snapshot) {
+ void phase2(final Translog.Snapshot snapshot) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
- StopWatch stopWatch = new StopWatch().start();
+ final StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
- // Send all the snapshot's translog operations to the target
- int totalOperations = sendSnapshot(snapshot);
+
+ // send all the snapshot's translog operations to the target
+ final int totalOperations = sendSnapshot(request.startingSeqNo(), snapshot);
+
stopWatch.stop();
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
}
- /**
+ /*
* finalizes the recovery process
*/
public void finalizeRecovery() {
@@ -410,7 +465,7 @@ public class RecoverySourceHandler {
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
}
- /**
+ /*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
@@ -421,77 +476,73 @@ public class RecoverySourceHandler {
}
/**
- * Send the given snapshot's operations to this handler's target node.
+ * Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's
+ * target node.
*
- * Operations are bulked into a single request depending on an operation
- * count limit or size-in-bytes limit
+ * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
+ * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
+ * @param snapshot the translog snapshot to replay operations from
* @return the total number of translog operations that were sent
+ * @throws IOException if an I/O exception occurred reading the translog snapshot
*/
- protected int sendSnapshot(final Translog.Snapshot snapshot) {
+ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
int ops = 0;
long size = 0;
int totalOperations = 0;
final List operations = new ArrayList<>();
- Translog.Operation operation;
- try {
- operation = snapshot.next(); // this ex should bubble up
- } catch (IOException ex) {
- throw new ElasticsearchException("failed to get next operation from translog", ex);
+
+ if (snapshot.totalOperations() == 0) {
+ logger.trace("[{}][{}] no translog operations to send to {}", indexName, shardId, request.targetNode());
}
- if (operation == null) {
- logger.trace("[{}][{}] no translog operations to send to {}",
- indexName, shardId, request.targetNode());
- }
- while (operation != null) {
+ // send operations in batches
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
+ // we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
+ if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
operations.add(operation);
- ops += 1;
+ ops++;
size += operation.estimateSize();
totalOperations++;
- // Check if this request is past bytes threshold, and
- // if so, send it off
+ // check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
-
- // don't throttle translog, since we lock for phase3 indexing,
- // so we need to move it as fast as possible. Note, since we
- // index docs to replicas while the index files are recovered
- // the lock can potentially be removed, in which case, it might
- // make sense to re-enable throttling in this phase
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
- indexName, shardId, ops, new ByteSizeValue(size),
- snapshot.totalOperations(),
- request.targetNode());
+ indexName,
+ shardId,
+ ops,
+ new ByteSizeValue(size),
+ snapshot.totalOperations(),
+ request.targetNode());
}
-
ops = 0;
size = 0;
operations.clear();
}
- try {
- operation = snapshot.next(); // this ex should bubble up
- } catch (IOException ex) {
- throw new ElasticsearchException("failed to get next operation from translog", ex);
- }
}
- // send the leftover
+
+ // send the leftover operations
if (!operations.isEmpty()) {
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
-
}
+
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent final batch of [{}][{}] (total: [{}]) translog operations to {}",
- indexName, shardId, ops, new ByteSizeValue(size),
- snapshot.totalOperations(),
- request.targetNode());
+ indexName,
+ shardId,
+ ops,
+ new ByteSizeValue(size),
+ snapshot.totalOperations(),
+ request.targetNode());
}
+
return totalOperations;
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 00dd019aac4..d9886efa07b 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -49,6 +49,7 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -97,17 +98,19 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final Map tempFileNames = ConcurrentCollections.newConcurrentMap();
/**
- * creates a new recovery target object that represents a recovery to the provided indexShard
+ * Creates a new recovery target object that represents a recovery to the provided shard.
*
- * @param indexShard local shard where we want to recover to
- * @param sourceNode source node of the recovery where we recover from
- * @param listener called when recovery is completed / failed
+ * @param indexShard local shard where we want to recover to
+ * @param sourceNode source node of the recovery where we recover from
+ * @param listener called when recovery is completed/failed
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
- * version. Necessary for primary relocation so that new primary knows about all other ongoing
- * replica recoveries when replicating documents (see {@link RecoverySourceHandler}).
+ * version; necessary for primary relocation so that new primary knows about all other ongoing
+ * replica recoveries when replicating documents (see {@link RecoverySourceHandler})
*/
- public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
- Callback ensureClusterStateVersionCallback) {
+ public RecoveryTarget(final IndexShard indexShard,
+ final DiscoveryNode sourceNode,
+ final PeerRecoveryTargetService.RecoveryListener listener,
+ final Callback ensureClusterStateVersionCallback) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
@@ -125,10 +128,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
/**
- * returns a fresh RecoveryTarget to retry recovery from the same source node onto the same IndexShard and using the same listener
+ * Returns a fresh recovery target to retry recovery from the same source node onto the same shard and using the same listener.
+ *
+ * @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
- return new RecoveryTarget(this.indexShard, this.sourceNode, this.listener, this.ensureClusterStateVersionCallback);
+ return new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
}
public long recoveryId() {
@@ -152,7 +157,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return indexShard.recoveryState();
}
- public CancellableThreads CancellableThreads() {
+ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}
@@ -220,7 +225,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
*
- * if {@link #CancellableThreads()} was used, the threads will be interrupted.
+ * if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java
index 509dd996d19..802576922f9 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java
@@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
@@ -29,17 +30,16 @@ import java.util.function.Function;
import java.util.function.Supplier;
/**
- * A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
- * as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock.
+ * A recovery handler that skips phase one as well as sending the translog snapshot.
*/
public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
- public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
- Supplier currentClusterStateVersionSupplier,
- Function delayNewRecoveries, Logger logger) {
+ SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
+ Supplier currentClusterStateVersionSupplier,
+ Function delayNewRecoveries, Logger logger) {
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, logger);
this.shard = shard;
this.request = request;
@@ -49,8 +49,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
public RecoveryResponse recoverToTarget() throws IOException {
boolean engineClosed = false;
try {
- logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
- long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
+ logger.trace("{} recovery [phase1] to {}: skipping phase1 for shared filesystem", request.shardId(), request.targetNode());
+ final long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
if (request.isPrimaryRelocation()) {
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
try {
@@ -83,9 +83,9 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
}
@Override
- protected int sendSnapshot(Translog.Snapshot snapshot) {
- logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}",
- shard.shardId(), request.targetNode());
+ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) {
+ logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", shard.shardId(), request.targetNode());
return 0;
}
+
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
index bc8a73b5622..46a29d65464 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
@@ -19,46 +19,60 @@
package org.elasticsearch.indices.recovery;
+import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
+/**
+ * Represents a request for starting a peer recovery.
+ */
public class StartRecoveryRequest extends TransportRequest {
private long recoveryId;
-
private ShardId shardId;
-
private DiscoveryNode sourceNode;
-
private DiscoveryNode targetNode;
-
private Store.MetadataSnapshot metadataSnapshot;
-
private boolean primaryRelocation;
+ private long startingSeqNo;
public StartRecoveryRequest() {
}
/**
- * Start recovery request.
+ * Construct a request for starting a peer recovery.
*
- * @param sourceNode The node to recover from
- * @param targetNode The node to recover to
+ * @param shardId the shard ID to recover
+ * @param sourceNode the source node to remover from
+ * @param targetNode the target node to recover to
+ * @param metadataSnapshot the Lucene metadata
+ * @param primaryRelocation whether or not the recovery is a primary relocation
+ * @param recoveryId the recovery ID
+ * @param startingSeqNo the starting sequence number
*/
- public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Store.MetadataSnapshot metadataSnapshot, boolean primaryRelocation, long recoveryId) {
+ public StartRecoveryRequest(final ShardId shardId,
+ final DiscoveryNode sourceNode,
+ final DiscoveryNode targetNode,
+ final Store.MetadataSnapshot metadataSnapshot,
+ final boolean primaryRelocation,
+ final long recoveryId,
+ final long startingSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.metadataSnapshot = metadataSnapshot;
this.primaryRelocation = primaryRelocation;
+ this.startingSeqNo = startingSeqNo;
}
public long recoveryId() {
@@ -85,6 +99,10 @@ public class StartRecoveryRequest extends TransportRequest {
return metadataSnapshot;
}
+ public long startingSeqNo() {
+ return startingSeqNo;
+ }
+
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@@ -94,6 +112,11 @@ public class StartRecoveryRequest extends TransportRequest {
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
primaryRelocation = in.readBoolean();
+ if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
+ startingSeqNo = in.readLong();
+ } else {
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ }
}
@Override
@@ -105,6 +128,9 @@ public class StartRecoveryRequest extends TransportRequest {
targetNode.writeTo(out);
metadataSnapshot.writeTo(out);
out.writeBoolean(primaryRelocation);
+ if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
+ out.writeLong(startingSeqNo);
+ }
}
}
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
index 431b592fac9..c97c0982ebb 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
@@ -26,8 +26,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
-import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -45,7 +43,6 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
-import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
import java.nio.file.DirectoryStream;
@@ -55,7 +52,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.stream.IntStream;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -384,105 +383,92 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue());
}
- public void testReusePeerRecovery() throws Exception {
- final Settings settings = Settings.builder()
- .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
- .put("gateway.recover_after_nodes", 4)
- .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
- .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
- .put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
+ public void testReuseInFileBasedPeerRecovery() throws Exception {
+ internalCluster().startMasterOnlyNode();
+ final String primaryNode = internalCluster().startDataOnlyNode(nodeSettings(0));
- internalCluster().startNodes(4, settings);
- // prevent any rebalance actions during the peer recovery
- // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
- // we reuse the files on disk after full restarts for replicas.
- assertAcked(prepareCreate("test").setSettings(Settings.builder()
- .put(indexSettings())
- .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
- ensureGreen();
- logger.info("--> indexing docs");
- for (int i = 0; i < 1000; i++) {
- client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
- if ((i % 200) == 0) {
- client().admin().indices().prepareFlush().execute().actionGet();
- }
- }
- if (randomBoolean()) {
- client().admin().indices().prepareFlush().execute().actionGet();
- }
- logger.info("Running Cluster Health");
- ensureGreen();
- client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
- client().admin().indices().prepareFlush().setForce(true).get();
-
- boolean useSyncIds = randomBoolean();
- if (useSyncIds == false) {
- logger.info("--> disabling allocation while the cluster is shut down");
-
- // Disable allocations while we are closing nodes
- client().admin().cluster().prepareUpdateSettings()
- .setTransientSettings(Settings.builder()
- .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
- .get();
- logger.info("--> full cluster restart");
- internalCluster().fullRestart();
-
- logger.info("--> waiting for cluster to return to green after first shutdown");
- ensureGreen();
- } else {
- logger.info("--> trying to sync flush");
- assertEquals(client().admin().indices().prepareSyncedFlush("test").get().failedShards(), 0);
- assertSyncIdsNotNull();
- }
-
- logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time");
- // Disable allocations while we are closing nodes
- client().admin().cluster().prepareUpdateSettings()
- .setTransientSettings(Settings.builder()
- .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
+ // create the index with our mapping
+ client(primaryNode)
+ .admin()
+ .indices()
+ .prepareCreate("test")
+ .setSettings(Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 1))
.get();
- Map primaryTerms = assertAndCapturePrimaryTerms(null);
-
- logger.info("--> full cluster restart");
- internalCluster().fullRestart();
-
- logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second ");
- ensureGreen();
- primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
-
- if (useSyncIds) {
- assertSyncIdsNotNull();
+ logger.info("--> indexing docs");
+ for (int i = 0; i < randomIntBetween(1, 1024); i++) {
+ client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
}
- RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
- for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
+
+ client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
+
+ // start the replica node; we do this after indexing so a file-based recovery is triggered to ensure the files are identical
+ final String replicaNode = internalCluster().startDataOnlyNode(nodeSettings(1));
+ ensureGreen();
+
+ final RecoveryResponse initialRecoveryReponse = client().admin().indices().prepareRecoveries("test").get();
+ final Set files = new HashSet<>();
+ for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
+ if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
+ for (final RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
+ files.add(file.name());
+ }
+ break;
+ }
+ }
+
+ logger.info("--> restart replica node");
+
+ internalCluster().restartNode(replicaNode, new RestartCallback() {
+ @Override
+ public Settings onNodeStopped(String nodeName) throws Exception {
+ // index some more documents; we expect to reuse the files that already exist on the replica
+ for (int i = 0; i < randomIntBetween(1, 1024); i++) {
+ client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
+ }
+
+ // prevent a sequence-number-based recovery from being possible
+ client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
+ return super.onNodeStopped(nodeName);
+ }
+ });
+
+ ensureGreen();
+
+ final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
+ for (final RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
long recovered = 0;
- for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
- if (file.name().startsWith("segments")) {
+ long reused = 0;
+ int filesRecovered = 0;
+ int filesReused = 0;
+ for (final RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
+ if (files.contains(file.name()) == false) {
recovered += file.length();
+ filesRecovered++;
+ } else {
+ reused += file.length();
+ filesReused++;
}
}
- if (!recoveryState.getPrimary() && (useSyncIds == false)) {
- logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
- recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
- recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
- assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
- assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
- // we have to recover the segments file since we commit the translog ID on engine startup
- assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
- assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(1));
- assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - 1));
- assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
- } else {
- if (useSyncIds && !recoveryState.getPrimary()) {
- logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
- recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
- recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
- }
+ if (recoveryState.getPrimary()) {
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
+ } else {
+ logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
+ recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
+ recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
+ assertThat("bytes should have been recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
+ assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
+ // we have to recover the segments file since we commit the translog ID on engine startup
+ assertThat("all existing files should be reused, byte count mismatch", recoveryState.getIndex().reusedBytes(), equalTo(reused));
+ assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
+ assertThat("the segment from the last round of indexing should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(filesRecovered));
+ assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
+ assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
+ assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
+ assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
}
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 97638b53e37..7c586b204b3 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -110,6 +110,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
@@ -648,14 +649,14 @@ public class InternalEngineTests extends ESTestCase {
assertThat(stats1.getGeneration(), greaterThan(0L));
assertThat(stats1.getId(), notNullValue());
assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY));
- assertThat(stats1.getUserData(), hasKey(InternalEngine.LOCAL_CHECKPOINT_KEY));
+ assertThat(stats1.getUserData(), hasKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
assertThat(
- Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
+ Long.parseLong(stats1.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
- assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
+ assertThat(stats1.getUserData(), hasKey(SequenceNumbers.MAX_SEQ_NO));
assertThat(
- Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
+ Long.parseLong(stats1.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
@@ -677,9 +678,9 @@ public class InternalEngineTests extends ESTestCase {
stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY),
not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY)));
- assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
- assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
- assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
+ assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
+ assertThat(stats2.getUserData(), hasKey(SequenceNumbers.MAX_SEQ_NO));
+ assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
} finally {
IOUtils.close(engine);
}
@@ -1772,14 +1773,14 @@ public class InternalEngineTests extends ESTestCase {
assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint));
assertThat(
- Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
+ Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(localCheckpoint));
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
assertThat(
initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint));
assertThat(
- Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
+ Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
equalTo(maxSeqNo));
} finally {
@@ -1793,13 +1794,13 @@ public class InternalEngineTests extends ESTestCase {
assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo());
assertThat(
- Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
+ Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
assertThat(
recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint));
assertThat(
- Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
+ Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert
// that the committed max seq no is equivalent to what the current primary seq no is, as all data
// we have assigned sequence numbers to should be in the commit
@@ -1861,11 +1862,11 @@ public class InternalEngineTests extends ESTestCase {
long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) {
Map userData = commit.getUserData();
- long localCheckpoint = userData.containsKey(InternalEngine.LOCAL_CHECKPOINT_KEY) ?
- Long.parseLong(userData.get(InternalEngine.LOCAL_CHECKPOINT_KEY)) :
+ long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ?
+ Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) :
SequenceNumbersService.NO_OPS_PERFORMED;
- long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ?
- Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) :
+ long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ?
+ Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) :
SequenceNumbersService.UNASSIGNED_SEQ_NO;
// local checkpoint and max seq no shouldn't go backwards
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint));
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index 042eb85cf36..08d854acdc3 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -20,6 +20,7 @@
package org.elasticsearch.index.replication;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
@@ -35,8 +36,11 @@ import org.elasticsearch.action.support.replication.TransportWriteActionTestHelp
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
@@ -46,6 +50,7 @@ import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
@@ -169,14 +174,43 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
updateAllocationIDsOnPrimary();
}
+ private final Runnable replicaGlobalCheckpointSyncer = () -> {
+ throw new AssertionError("replicas can not sync global checkpoint");
+ };
+
public synchronized IndexShard addReplica() throws IOException {
- final IndexShard replica = newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData,
- () -> { throw new AssertionError("replicas can't sync global checkpoint"); }, null);
+ final IndexShard replica =
+ newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, replicaGlobalCheckpointSyncer, null);
replicas.add(replica);
updateAllocationIDsOnPrimary();
return replica;
}
+ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
+ final ShardRouting shardRouting = TestShardRouting.newShardRouting(
+ shardId,
+ nodeId,
+ false, ShardRoutingState.INITIALIZING,
+ RecoverySource.PeerRecoverySource.INSTANCE);
+
+ final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, replicaGlobalCheckpointSyncer);
+ replicas.add(newReplica);
+ updateAllocationIDsOnPrimary();
+ return newReplica;
+ }
+
+ public synchronized List getReplicas() {
+ return Collections.unmodifiableList(replicas);
+ }
+
+ synchronized boolean removeReplica(IndexShard replica) {
+ final boolean removed = replicas.remove(replica);
+ if (removed) {
+ updateAllocationIDsOnPrimary();
+ }
+ return removed;
+ }
+
public void recoverReplica(IndexShard replica) throws IOException {
recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {}));
}
@@ -186,8 +220,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
recoverReplica(replica, targetSupplier, true);
}
- public void recoverReplica(IndexShard replica, BiFunction targetSupplier,
- boolean markAsRecovering) throws IOException {
+ public void recoverReplica(
+ IndexShard replica,
+ BiFunction targetSupplier,
+ boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
updateAllocationIDsOnPrimary();
}
diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index 93b20633cf1..ea603e100c7 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -20,6 +20,8 @@
package org.elasticsearch.index.replication;
import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
@@ -34,6 +36,10 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase {
public void testIndexingDuringFileRecovery() throws Exception {
@@ -57,11 +63,77 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
+ public void testRecoveryOfDisconnectedReplica() throws Exception {
+ try (final ReplicationGroup shards = createGroup(1)) {
+ shards.startAll();
+ int docs = shards.indexDocs(randomInt(50));
+ shards.flush();
+ shards.getPrimary().updateGlobalCheckpointOnPrimary();
+ final IndexShard originalReplica = shards.getReplicas().get(0);
+ long replicaCommittedLocalCheckpoint = docs - 1;
+ boolean replicaHasDocsSinceLastFlushedCheckpoint = false;
+ for (int i = 0; i < randomInt(2); i++) {
+ final int indexedDocs = shards.indexDocs(randomInt(5));
+ docs += indexedDocs;
+ if (indexedDocs > 0) {
+ replicaHasDocsSinceLastFlushedCheckpoint = true;
+ }
+
+ final boolean flush = randomBoolean();
+ if (flush) {
+ originalReplica.flush(new FlushRequest());
+ replicaHasDocsSinceLastFlushedCheckpoint = false;
+ replicaCommittedLocalCheckpoint = docs - 1;
+ }
+
+ final boolean sync = randomBoolean();
+ if (sync) {
+ shards.getPrimary().updateGlobalCheckpointOnPrimary();
+ }
+ }
+
+ shards.removeReplica(originalReplica);
+
+ final int missingOnReplica = shards.indexDocs(randomInt(5));
+ docs += missingOnReplica;
+ replicaHasDocsSinceLastFlushedCheckpoint |= missingOnReplica > 0;
+
+ if (randomBoolean()) {
+ shards.getPrimary().updateGlobalCheckpointOnPrimary();
+ }
+
+ final boolean flushPrimary = randomBoolean();
+ if (flushPrimary) {
+ shards.flush();
+ }
+
+ originalReplica.close("disconnected", false);
+ IOUtils.close(originalReplica.store());
+ final IndexShard recoveredReplica =
+ shards.addReplicaWithExistingPath(originalReplica.shardPath(), originalReplica.routingEntry().currentNodeId());
+ shards.recoverReplica(recoveredReplica);
+ if (flushPrimary && replicaHasDocsSinceLastFlushedCheckpoint) {
+ // replica has something to catch up with, but since we flushed the primary, we should fall back to full recovery
+ assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty()));
+ } else {
+ assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
+ assertThat(
+ recoveredReplica.recoveryState().getTranslog().recoveredOperations(),
+ equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1))));
+ }
+
+ docs += shards.indexDocs(randomInt(5));
+
+ shards.assertAllEqual(docs);
+ }
+ }
+
private static class BlockingTarget extends RecoveryTarget {
+
private final CountDownLatch recoveryBlocked;
private final CountDownLatch releaseRecovery;
private final RecoveryState.Stage stageToBlock;
- public static final EnumSet SUPPORTED_STAGES =
+ static final EnumSet SUPPORTED_STAGES =
EnumSet.of(RecoveryState.Stage.INDEX, RecoveryState.Stage.TRANSLOG, RecoveryState.Stage.FINALIZE);
private final Logger logger;
@@ -119,4 +191,5 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
+
}
diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java
index 8d8be2e402d..7d6dd25403b 100644
--- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java
+++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java
@@ -42,19 +42,22 @@ import static org.hamcrest.Matchers.not;
public class GlobalCheckpointTests extends ESTestCase {
- GlobalCheckpointService checkpointService;
+ GlobalCheckpointTracker tracker;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0),
- IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO);
+ tracker =
+ new GlobalCheckpointTracker(
+ new ShardId("test", "_na_", 0),
+ IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
+ UNASSIGNED_SEQ_NO);
}
public void testEmptyShards() {
- assertFalse("checkpoint shouldn't be updated when the are no active shards", checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
+ assertFalse("checkpoint shouldn't be updated when the are no active shards", tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}
private final AtomicInteger aIdGenerator = new AtomicInteger();
@@ -81,7 +84,7 @@ public class GlobalCheckpointTests extends ESTestCase {
// it is however nice not to assume this on this level and check we do the right thing.
final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);
- assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
+ assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
allocations.keySet().forEach(aId -> {
@@ -96,42 +99,42 @@ public class GlobalCheckpointTests extends ESTestCase {
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
- checkpointService.updateAllocationIdsFromMaster(active, initializing);
- initializing.forEach(aId -> checkpointService.markAllocationIdAsInSync(aId));
- allocations.keySet().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
+ tracker.updateAllocationIdsFromMaster(active, initializing);
+ initializing.forEach(aId -> tracker.markAllocationIdAsInSync(aId));
+ allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
- assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
+ assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
- assertThat(checkpointService.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
- assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
+ assertThat(tracker.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
+ assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// increment checkpoints
active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
- allocations.keySet().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
+ allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final String extraId = "extra_" + randomAsciiOfLength(5);
// first check that adding it without the master blessing doesn't change anything.
- checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
- assertThat(checkpointService.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
+ tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
+ assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set newActive = new HashSet<>(active);
newActive.add(extraId);
- checkpointService.updateAllocationIdsFromMaster(newActive, initializing);
+ tracker.updateAllocationIdsFromMaster(newActive, initializing);
// we should ask for a refresh , but not update the checkpoint
- assertTrue(checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
+ assertTrue(tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// now notify for the new id
- checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
+ tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
// now it should be incremented
- assertTrue(checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), greaterThan(maxLocalCheckpoint));
+ assertTrue(tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), greaterThan(maxLocalCheckpoint));
}
public void testMissingActiveIdsPreventAdvance() {
@@ -140,60 +143,60 @@ public class GlobalCheckpointTests extends ESTestCase {
final Map assigned = new HashMap<>();
assigned.putAll(active);
assigned.putAll(initializing);
- checkpointService.updateAllocationIdsFromMaster(
+ tracker.updateAllocationIdsFromMaster(
new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())),
initializing.keySet());
- randomSubsetOf(initializing.keySet()).forEach(checkpointService::markAllocationIdAsInSync);
- assigned.forEach(checkpointService::updateLocalCheckpoint);
+ randomSubsetOf(initializing.keySet()).forEach(tracker::markAllocationIdAsInSync);
+ assigned.forEach(tracker::updateLocalCheckpoint);
// now mark all active shards
- checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
+ tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
// global checkpoint can't be advanced, but we need a sync
- assertTrue(checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
+ assertTrue(tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
- assigned.forEach(checkpointService::updateLocalCheckpoint);
- assertTrue(checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
+ assigned.forEach(tracker::updateLocalCheckpoint);
+ assertTrue(tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testMissingInSyncIdsPreventAdvance() {
final Map active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5);
- checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
- initializing.keySet().forEach(checkpointService::markAllocationIdAsInSync);
+ tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
+ initializing.keySet().forEach(tracker::markAllocationIdAsInSync);
randomSubsetOf(randomInt(initializing.size() - 1),
- initializing.keySet()).forEach(aId -> checkpointService.updateLocalCheckpoint(aId, initializing.get(aId)));
+ initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId)));
- active.forEach(checkpointService::updateLocalCheckpoint);
+ active.forEach(tracker::updateLocalCheckpoint);
// global checkpoint can't be advanced, but we need a sync
- assertTrue(checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
+ assertTrue(tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
- initializing.forEach(checkpointService::updateLocalCheckpoint);
- assertTrue(checkpointService.updateCheckpointOnPrimary());
- assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
+ initializing.forEach(tracker::updateLocalCheckpoint);
+ assertTrue(tracker.updateCheckpointOnPrimary());
+ assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
final Map active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
- checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
- initializing.keySet().forEach(checkpointService::markAllocationIdAsInSync);
- nonApproved.keySet().forEach(checkpointService::markAllocationIdAsInSync);
+ tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
+ initializing.keySet().forEach(tracker::markAllocationIdAsInSync);
+ nonApproved.keySet().forEach(tracker::markAllocationIdAsInSync);
List