diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java
index e8aa0cdeb89..68eaf86f4f8 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
@@ -31,95 +32,94 @@ import java.util.Set;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
/**
- * A shard component that is responsible of tracking the global checkpoint. The global checkpoint
- * is the highest seq_no for which all lower (or equal) seq_no have been processed on all shards that
- * are currently active. Since shards count as "active" when the master starts them, and before this primary shard
- * has been notified of this fact, we also include shards that have completed recovery. These shards have received
- * all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set
- * of shards that are taken into account for the global checkpoint calculation are called the "in sync" shards.
- *
+ * A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
+ * all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
+ * master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
+ * These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
+ * The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
*
- * The global checkpoint is maintained by the primary shard and is replicated to all the replicas
- * (via {@link GlobalCheckpointSyncAction}).
+ * The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
-
- /**
- * This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up
- * to speed through recovery. These shards are treated as valid copies and participate in determining the global
- * checkpoint.
- *
- * Keyed by allocation ids.
+ /*
+ * This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
+ * through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by
+ * allocation IDs. All accesses to this set are guarded by a lock on this.
*/
- private final ObjectLongMap inSyncLocalCheckpoints; // keyed by allocation ids
+ private final ObjectLongMap inSyncLocalCheckpoints;
- /**
- * This set holds the last set of known valid allocation ids as received by the master. This is important to make sure
- * shard that are failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global
- * checkpoint back
+ /*
+ * This set holds the last set of known valid allocation ids as received by the master. This is important to make sure shard that are
+ * failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global checkpoint back. All accesses to
+ * this set are guarded by a lock on this.
*/
private final Set assignedAllocationIds;
+ /*
+ * The current global checkpoint for this shard. Note that this field is guarded by a lock on this and thus this field does not need to
+ * be volatile.
+ */
private long globalCheckpoint;
/**
- * Initialize the global checkpoint service. The {@code globalCheckpoint}
- * should be set to the last known global checkpoint for this shard, or
- * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
+ * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
+ * shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
- * @param shardId the shard this service is providing tracking
- * local checkpoints for
+ * @param shardId the shard this service is tracking local checkpoints for
* @param indexSettings the index settings
- * @param globalCheckpoint the last known global checkpoint for this shard,
- * or
- * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
+ * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
- assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint:" + globalCheckpoint;
+ assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
}
/**
- * Notifies the service of a local checkpoint. If the checkpoint is lower than the currently known one,
- * this is a noop. Last, if the allocation id is not in sync, it is ignored. This to prevent late
- * arrivals from shards that are removed to be re-added.
+ * Notifies the service to update the local checkpoint for the shard with the provided allocation ID. If the checkpoint is lower than
+ * the currently known one, this is a no-op. If the allocation ID is not in sync, it is ignored. This is to prevent late arrivals from
+ * shards that are removed to be re-added.
+ *
+ * @param allocationId the allocation ID of the shard to update the local checkpoint for
+ * @param checkpoint the local checkpoint for the shard
*/
- public synchronized void updateLocalCheckpoint(String allocationId, long localCheckpoint) {
+ public synchronized void updateLocalCheckpoint(final String allocationId, final long checkpoint) {
final int indexOfKey = inSyncLocalCheckpoints.indexOf(allocationId);
if (indexOfKey >= 0) {
final long current = inSyncLocalCheckpoints.indexGet(indexOfKey);
-
- if (current < localCheckpoint) {
- inSyncLocalCheckpoints.indexReplace(indexOfKey, localCheckpoint);
+ if (current < checkpoint) {
+ inSyncLocalCheckpoints.indexReplace(indexOfKey, checkpoint);
if (logger.isTraceEnabled()) {
- logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, localCheckpoint, current);
+ logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, checkpoint, current);
}
} else {
- logger.trace("skipping update of local checkpoint [{}], current checkpoint is higher " +
- "(current [{}], incoming [{}], type [{}])",
- allocationId, current, localCheckpoint, allocationId);
+ logger.trace(
+ "skipping update of local checkpoint [{}], current checkpoint is higher (current [{}], incoming [{}], type [{}])",
+ allocationId,
+ current,
+ checkpoint,
+ allocationId);
}
} else {
- logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, localCheckpoint);
+ logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, checkpoint);
}
}
/**
- * Scans through the currently known local checkpoints and updates the global checkpoint accordingly.
+ * Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
- * @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
- * of one of the active allocations is not known.
+ * @return {@code true} if the checkpoint has been updated or if it can not be updated since one of the local checkpoints of one of the
+ * active allocations is not known.
*/
synchronized boolean updateCheckpointOnPrimary() {
long minCheckpoint = Long.MAX_VALUE;
if (inSyncLocalCheckpoints.isEmpty()) {
return false;
}
- for (ObjectLongCursor cp : inSyncLocalCheckpoints) {
+ for (final ObjectLongCursor cp : inSyncLocalCheckpoints) {
if (cp.value == UNASSIGNED_SEQ_NO) {
logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key);
return true;
@@ -139,36 +139,39 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
}
/**
- * gets the current global checkpoint. See java docs for {@link GlobalCheckpointService} for more details
+ * Returns the global checkpoint for the shard.
+ *
+ * @return the global checkpoint
*/
public synchronized long getCheckpoint() {
return globalCheckpoint;
}
/**
- * updates the global checkpoint on a replica shard (after it has been updated by the primary).
+ * Updates the global checkpoint on a replica shard after it has been updated by the primary.
+ *
+ * @param checkpoint the global checkpoint
*/
- synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
+ synchronized void updateCheckpointOnReplica(final long checkpoint) {
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
* replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary.
*/
- if (this.globalCheckpoint <= globalCheckpoint) {
- this.globalCheckpoint = globalCheckpoint;
- logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
+ if (this.globalCheckpoint <= checkpoint) {
+ this.globalCheckpoint = checkpoint;
+ logger.trace("global checkpoint updated from primary to [{}]", checkpoint);
}
}
/**
- * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that
- * have been removed.
+ * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed.
*
- * @param activeAllocationIds the allocation ids of the currently active shard copies
- * @param initializingAllocationIds the allocation ids of the currently initializing shard copies
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
- public synchronized void updateAllocationIdsFromMaster(Set activeAllocationIds,
- Set initializingAllocationIds) {
+ public synchronized void updateAllocationIdsFromMaster(final Set activeAllocationIds,
+ final Set initializingAllocationIds) {
assignedAllocationIds.removeIf(
aId -> activeAllocationIds.contains(aId) == false && initializingAllocationIds.contains(aId) == false);
assignedAllocationIds.addAll(activeAllocationIds);
@@ -182,22 +185,28 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
}
/**
- * marks the allocationId as "in sync" with the primary shard. This should be called at the end of recovery
- * where the primary knows all operation below the global checkpoint have been completed on this shard.
+ * Marks the shard with the provided allocation ID as in-sync with the primary shard. This should be called at the end of recovery where
+ * the primary knows all operations below the global checkpoint have been completed on this shard.
*
- * @param allocationId allocationId of the recovering shard
+ * @param allocationId the allocation ID of the shard to mark as in-sync
*/
- public synchronized void markAllocationIdAsInSync(String allocationId) {
+ public synchronized void markAllocationIdAsInSync(final String allocationId) {
if (assignedAllocationIds.contains(allocationId) == false) {
- // master have change it's mind and removed this allocation, ignore.
+ // master has removed this allocation, ignore
return;
}
logger.trace("marked [{}] as in sync", allocationId);
inSyncLocalCheckpoints.put(allocationId, UNASSIGNED_SEQ_NO);
}
- // for testing
- synchronized long getLocalCheckpointForAllocation(String allocationId) {
+ /**
+ * Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if
+ * the shard is not in-sync.
+ *
+ * @param allocationId the allocation ID of the shard to obtain the local checkpoint for
+ * @return the local checkpoint, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
+ */
+ synchronized long getLocalCheckpointForAllocationId(final String allocationId) {
if (inSyncLocalCheckpoints.containsKey(allocationId)) {
return inSyncLocalCheckpoints.get(allocationId);
}
diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java
index f27dfd189be..8d8be2e402d 100644
--- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java
+++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java
@@ -116,7 +116,7 @@ public class GlobalCheckpointTests extends ESTestCase {
// first check that adding it without the master blessing doesn't change anything.
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
- assertThat(checkpointService.getLocalCheckpointForAllocation(extraId), equalTo(UNASSIGNED_SEQ_NO));
+ assertThat(checkpointService.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set newActive = new HashSet<>(active);
newActive.add(extraId);