Cleanup comments in GlobalCheckpointService.java

This commit cleans up the comments in GlobalCheckpointService, making
them uniform in their formatting and taking advantage of the line-length
limit of 140 characters.
This commit is contained in:
Jason Tedor 2017-01-03 12:23:33 -05:00
parent 64888ab1d3
commit 9a65d2008e
2 changed files with 75 additions and 66 deletions

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
@ -31,95 +32,94 @@ import java.util.Set;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
/**
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint
* is the highest seq_no for which all lower (or equal) seq_no have been processed on all shards that
* are currently active. Since shards count as "active" when the master starts them, and before this primary shard
* has been notified of this fact, we also include shards that have completed recovery. These shards have received
* all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set
* of shards that are taken into account for the global checkpoint calculation are called the "in sync" shards.
*
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
* all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
* master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
* These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
* The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas
* (via {@link GlobalCheckpointSyncAction}).
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
/**
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up
* to speed through recovery. These shards are treated as valid copies and participate in determining the global
* checkpoint.
* <p>
* Keyed by allocation ids.
/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
* through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by
* allocation IDs. All accesses to this set are guarded by a lock on this.
*/
private final ObjectLongMap<String> inSyncLocalCheckpoints; // keyed by allocation ids
private final ObjectLongMap<String> inSyncLocalCheckpoints;
/**
* This set holds the last set of known valid allocation ids as received by the master. This is important to make sure
* shard that are failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global
* checkpoint back
/*
* This set holds the last set of known valid allocation ids as received by the master. This is important to make sure shard that are
* failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global checkpoint back. All accesses to
* this set are guarded by a lock on this.
*/
private final Set<String> assignedAllocationIds;
/*
* The current global checkpoint for this shard. Note that this field is guarded by a lock on this and thus this field does not need to
* be volatile.
*/
private long globalCheckpoint;
/**
* Initialize the global checkpoint service. The {@code globalCheckpoint}
* should be set to the last known global checkpoint for this shard, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
* shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is providing tracking
* local checkpoints for
* @param shardId the shard this service is tracking local checkpoints for
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard,
* or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint:" + globalCheckpoint;
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
}
/**
* Notifies the service of a local checkpoint. If the checkpoint is lower than the currently known one,
* this is a noop. Last, if the allocation id is not in sync, it is ignored. This to prevent late
* arrivals from shards that are removed to be re-added.
* Notifies the service to update the local checkpoint for the shard with the provided allocation ID. If the checkpoint is lower than
* the currently known one, this is a no-op. If the allocation ID is not in sync, it is ignored. This is to prevent late arrivals from
* shards that are removed to be re-added.
*
* @param allocationId the allocation ID of the shard to update the local checkpoint for
* @param checkpoint the local checkpoint for the shard
*/
public synchronized void updateLocalCheckpoint(String allocationId, long localCheckpoint) {
public synchronized void updateLocalCheckpoint(final String allocationId, final long checkpoint) {
final int indexOfKey = inSyncLocalCheckpoints.indexOf(allocationId);
if (indexOfKey >= 0) {
final long current = inSyncLocalCheckpoints.indexGet(indexOfKey);
if (current < localCheckpoint) {
inSyncLocalCheckpoints.indexReplace(indexOfKey, localCheckpoint);
if (current < checkpoint) {
inSyncLocalCheckpoints.indexReplace(indexOfKey, checkpoint);
if (logger.isTraceEnabled()) {
logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, localCheckpoint, current);
logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, checkpoint, current);
}
} else {
logger.trace("skipping update of local checkpoint [{}], current checkpoint is higher " +
"(current [{}], incoming [{}], type [{}])",
allocationId, current, localCheckpoint, allocationId);
logger.trace(
"skipping update of local checkpoint [{}], current checkpoint is higher (current [{}], incoming [{}], type [{}])",
allocationId,
current,
checkpoint,
allocationId);
}
} else {
logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, localCheckpoint);
logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, checkpoint);
}
}
/**
* Scans through the currently known local checkpoints and updates the global checkpoint accordingly.
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
* @return {@code true} if the checkpoint has been updated or if it can not be updated since one of the local checkpoints of one of the
* active allocations is not known.
*/
synchronized boolean updateCheckpointOnPrimary() {
long minCheckpoint = Long.MAX_VALUE;
if (inSyncLocalCheckpoints.isEmpty()) {
return false;
}
for (ObjectLongCursor<String> cp : inSyncLocalCheckpoints) {
for (final ObjectLongCursor<String> cp : inSyncLocalCheckpoints) {
if (cp.value == UNASSIGNED_SEQ_NO) {
logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key);
return true;
@ -139,36 +139,39 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
}
/**
* gets the current global checkpoint. See java docs for {@link GlobalCheckpointService} for more details
* Returns the global checkpoint for the shard.
*
* @return the global checkpoint
*/
public synchronized long getCheckpoint() {
return globalCheckpoint;
}
/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
* @param checkpoint the global checkpoint
*/
synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
synchronized void updateCheckpointOnReplica(final long checkpoint) {
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
* replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary.
*/
if (this.globalCheckpoint <= globalCheckpoint) {
this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
if (this.globalCheckpoint <= checkpoint) {
this.globalCheckpoint = checkpoint;
logger.trace("global checkpoint updated from primary to [{}]", checkpoint);
}
}
/**
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that
* have been removed.
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public synchronized void updateAllocationIdsFromMaster(Set<String> activeAllocationIds,
Set<String> initializingAllocationIds) {
public synchronized void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds,
final Set<String> initializingAllocationIds) {
assignedAllocationIds.removeIf(
aId -> activeAllocationIds.contains(aId) == false && initializingAllocationIds.contains(aId) == false);
assignedAllocationIds.addAll(activeAllocationIds);
@ -182,22 +185,28 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
}
/**
* marks the allocationId as "in sync" with the primary shard. This should be called at the end of recovery
* where the primary knows all operation below the global checkpoint have been completed on this shard.
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This should be called at the end of recovery where
* the primary knows all operations below the global checkpoint have been completed on this shard.
*
* @param allocationId allocationId of the recovering shard
* @param allocationId the allocation ID of the shard to mark as in-sync
*/
public synchronized void markAllocationIdAsInSync(String allocationId) {
public synchronized void markAllocationIdAsInSync(final String allocationId) {
if (assignedAllocationIds.contains(allocationId) == false) {
// master have change it's mind and removed this allocation, ignore.
// master has removed this allocation, ignore
return;
}
logger.trace("marked [{}] as in sync", allocationId);
inSyncLocalCheckpoints.put(allocationId, UNASSIGNED_SEQ_NO);
}
// for testing
synchronized long getLocalCheckpointForAllocation(String allocationId) {
/**
* Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if
* the shard is not in-sync.
*
* @param allocationId the allocation ID of the shard to obtain the local checkpoint for
* @return the local checkpoint, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
synchronized long getLocalCheckpointForAllocationId(final String allocationId) {
if (inSyncLocalCheckpoints.containsKey(allocationId)) {
return inSyncLocalCheckpoints.get(allocationId);
}

View File

@ -116,7 +116,7 @@ public class GlobalCheckpointTests extends ESTestCase {
// first check that adding it without the master blessing doesn't change anything.
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(checkpointService.getLocalCheckpointForAllocation(extraId), equalTo(UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);