Introduce primary context (#25122)

* Introduce primary context

The target of a primary relocation is not aware of the state of the
replication group. In particular, it is not tracking in-sync and
initializing shards and their checkpoints. This means that after the
target shard is started, its knowledge of the replication group could
differ from that of the relocation source. In particular, this differing
view can lead to it computing a global checkpoint that moves backwards
after it becomes aware of the state of the entire replication
group. This commit addresses this issue by transferring a primary
context during relocation handoff.

* Fix test

* Add assertion messages

* Javadocs

* Barrier between marking a shard in sync and relocating

* Fix misplaced call

* Paranoia

* Better latch countdown

* Catch any exception

* Fix comment

* Fix wait for cluster state relocation test

* Update knowledge via upate local checkpoint API

* toString

* Visibility

* Refactor permit

* Push down

* Imports

* Docs

* Fix compilation

* Remove assertion

* Fix compilation

* Remove context wrapper

* Move PrimaryContext to new package

* Piping for cluster state version

This commit adds piping for the cluster state version to the global
checkpoint tracker. We do not use it yet.

* Remove unused import

* Implement versioning in tracker

* Fix test

* Unneeded public

* Imports

* Promote on our own

* Add tests

* Import

* Newline

* Update comment

* Serialization

* Assertion message

* Update stale comment

* Remove newline

* Less verbose

* Remove redundant assertion

* Tracking -> in-sync

* Assertions

* Just say no

Friends do not let friends block the cluster state update thread on
network operations.

* Extra newline

* Add allocation ID to assertion

* Rename method

* Another rename

* Introduce sealing

* Sealing tests

* One more assertion

* Fix imports

* Safer sealing

* Remove check

* Remove another sealed check
This commit is contained in:
Jason Tedor 2017-06-26 14:09:15 -04:00 committed by GitHub
parent 4306315ff6
commit c6a03bc549
21 changed files with 908 additions and 103 deletions

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.collect;
public class LongTuple<T> {
public static <T> LongTuple<T> tuple(final T v1, final long v2) {
return new LongTuple<>(v1, v2);
}
private final T v1;
private final long v2;
private LongTuple(final T v1, final long v2) {
this.v1 = v1;
this.v2 = v2;
}
public T v1() {
return v1;
}
public long v2() {
return v2;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LongTuple tuple = (LongTuple) o;
return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2);
}
@Override
public int hashCode() {
int result = v1 != null ? v1.hashCode() : 0;
result = 31 * result + Long.hashCode(v2);
return result;
}
@Override
public String toString() {
return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
}
}

View File

@ -23,13 +23,20 @@ import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.LongTuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
@ -42,6 +49,8 @@ import java.util.Set;
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
long appliedClusterStateVersion;
/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
* through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by
@ -68,6 +77,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
private long globalCheckpoint;
/*
* During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional
* mutations to this tracker until the handoff has completed.
*/
private boolean sealed = false;
/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
@ -94,6 +109,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @param localCheckpoint the local checkpoint for the shard
*/
public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) {
if (sealed) {
throw new IllegalStateException("global checkpoint tracker is sealed");
}
final boolean updated;
if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) {
updated = true;
@ -210,11 +228,18 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public synchronized void updateAllocationIdsFromMaster(
final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
if (applyingClusterStateVersion < appliedClusterStateVersion) {
return;
}
appliedClusterStateVersion = applyingClusterStateVersion;
// remove shards whose allocation ID no longer exists
inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a));
@ -248,6 +273,135 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
updateGlobalCheckpointOnPrimary();
}
/**
* Get the primary context for the shard. This includes the state of the global checkpoint tracker.
*
* @return the primary context
*/
synchronized PrimaryContext primaryContext() {
if (sealed) {
throw new IllegalStateException("global checkpoint tracker is sealed");
}
sealed = true;
final ObjectLongMap<String> inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints);
final ObjectLongMap<String> trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints);
return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints);
}
/**
* Releases a previously acquired primary context.
*/
synchronized void releasePrimaryContext() {
assert sealed;
sealed = false;
}
/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the primary context
*/
synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
if (sealed) {
throw new IllegalStateException("global checkpoint tracker is sealed");
}
/*
* We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation
* target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary
* context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define
* the following values:
* - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the
* relocation target
* - version(context) = the cluster state version on the relocation source when the primary context was sampled
* - version(target) = the current cluster state version on the relocation target
*
* We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and
* version(target) < version(context) are all possibilities.
*
* The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing
* shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target
* receives from the relocation source via the primary context.
*
* Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary
* context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can
* have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target
* itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary
* context.
*
* Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary
* context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can
* have been started. Moreover, existing active allocation IDs could have been removed from the cluster state.
*
* In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in
* the cluster state applied on the target.
*
* If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already
* applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to
* GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe.
*
* If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not
* yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target).
* Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such,
* these shards are not problematic.
*
* Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but
* not contained in the cluster state applied on the target.
*
* If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on
* the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not
* cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here.
*
* If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In
* this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking
* map and are safe here too.
*/
assert StreamSupport
.stream(inSyncLocalCheckpoints.spliterator(), false)
.allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints;
assert StreamSupport
.stream(trackingLocalCheckpoints.spliterator(), false)
.allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints;
assert pendingInSync.isEmpty() : pendingInSync;
if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) {
final Set<String> activeAllocationIds =
new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class)));
final Set<String> initializingAllocationIds =
new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class)));
updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds);
}
/*
* As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means
* that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not
* regress.
*/
final List<LongTuple<String>> inSync =
StreamSupport
.stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false)
.map(e -> LongTuple.tuple(e.key, e.value))
.collect(Collectors.toList());
inSync.sort(Comparator.comparingLong(LongTuple::v2));
for (final LongTuple<String> cursor : inSync) {
assert cursor.v2() >= globalCheckpoint
: "local checkpoint [" + cursor.v2() + "] "
+ "for allocation ID [" + cursor.v1() + "] "
+ "violates being at least the global checkpoint [" + globalCheckpoint + "]";
updateLocalCheckpoint(cursor.v1(), cursor.v2());
if (trackingLocalCheckpoints.containsKey(cursor.v1())) {
moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation");
updateGlobalCheckpointOnPrimary();
}
}
for (final ObjectLongCursor<String> cursor : primaryContext.trackingLocalCheckpoints()) {
updateLocalCheckpoint(cursor.key, cursor.value);
}
}
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint
* on the specified shard advances above the current global checkpoint.
@ -258,6 +412,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance
*/
public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
if (sealed) {
throw new IllegalStateException("global checkpoint tracker is sealed");
}
if (!trackingLocalCheckpoints.containsKey(allocationId)) {
/*
* This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing
@ -295,15 +452,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE);
if (current >= globalCheckpoint) {
logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current);
trackingLocalCheckpoints.remove(allocationId);
/*
* This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could
* still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the
* shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the
* master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs.
*/
inSyncLocalCheckpoints.put(allocationId, current);
moveAllocationIdFromTrackingToInSync(allocationId, "recovery");
break;
} else {
waitForLocalCheckpointToAdvance();
@ -311,6 +466,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
}
}
/**
* Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has
* advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the
* relocation source.
*
* @param allocationId the allocation ID to move
* @param reason the reason for the transition
*/
private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) {
assert trackingLocalCheckpoints.containsKey(allocationId);
final long current = trackingLocalCheckpoints.remove(allocationId);
inSyncLocalCheckpoints.put(allocationId, current);
logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason);
}
/**
* Wait for the local checkpoint to advance to the global checkpoint.
*
@ -324,12 +494,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Check if there are any recoveries pending in-sync.
*
* @return {@code true} if there is at least one shard pending in-sync, otherwise false
* @return true if there is at least one shard pending in-sync, otherwise false
*/
public boolean pendingInSync() {
boolean pendingInSync() {
return !pendingInSync.isEmpty();
}
/**
* Check if the tracker is sealed.
*
* @return true if the tracker is sealed, otherwise false.
*/
boolean sealed() {
return sealed;
}
/**
* Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if
* the shard is not in-sync.

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import java.util.Set;
@ -165,13 +166,24 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
public void updateAllocationIdsFromMaster(
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
}
/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the sequence number context
*/
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
}
/**
@ -183,4 +195,20 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
return globalCheckpointTracker.pendingInSync();
}
/**
* Get the primary context for the shard. This includes the state of the global checkpoint tracker.
*
* @return the primary context
*/
public PrimaryContext primaryContext() {
return globalCheckpointTracker.primaryContext();
}
/**
* Releases a previously acquired primary context.
*/
public void releasePrimaryContext() {
globalCheckpointTracker.releasePrimaryContext();
}
}

View File

@ -515,32 +515,38 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
/**
* Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
* {@link Runnable} is executed after all operations are successfully blocked.
*
* @param reason the reason for the relocation
* @param consumer a {@link Runnable} that is executed after operations are blocked
* @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
* @throws InterruptedException if blocking operations is interrupted
*/
public void relocated(
final String reason, final Consumer<PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
/*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
*/
verifyRelocatingState();
final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext();
try {
consumer.accept(primaryContext);
synchronized (mutex) {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
// if the master cancelled the recovery, the target will be removed
// and the recovery will stopped.
// However, it is still possible that we concurrently end up here
// and therefore have to protect we don't mark the shard as relocated when
// its shard routing says otherwise.
if (shardRouting.relocating() == false) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
if (primaryReplicaResyncInProgress.get()) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
}
verifyRelocatingState();
changeState(IndexShardState.RELOCATED, reason);
}
} catch (final Exception e) {
getEngine().seqNoService().releasePrimaryContext();
}
});
} catch (TimeoutException e) {
logger.warn("timed out waiting for relocation hand-off to complete");
@ -551,6 +557,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
private void verifyRelocatingState() {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
/*
* If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible
* that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing
* says otherwise.
*/
if (shardRouting.relocating() == false) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
if (primaryReplicaResyncInProgress.get()) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
}
}
public IndexShardState state() {
return state;
@ -1319,7 +1345,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void verifyPrimary() {
if (shardRouting.primary() == false) {
throw new IllegalStateException("shard is not a primary " + shardRouting);
throw new IllegalStateException("shard " + shardRouting + " is not a primary");
}
}
@ -1327,8 +1353,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("active primary shard cannot be a replication target before " +
" relocation hand off " + shardRouting + ", state is [" + state + "]");
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
"relocation hand off, state is [" + state + "]");
}
}
@ -1603,8 +1629,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
/*
* We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the
* replica; mark our self as active to force a future background sync.
* We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to
* the replica; mark our self as active to force a future background sync.
*/
active.compareAndSet(false, true);
}
@ -1654,18 +1680,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)}
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
* for details.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
public void updateAllocationIdsFromMaster(
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
verifyPrimary();
final Engine engine = getEngineOrNull();
// if the engine is not yet started, we are not ready yet and can just ignore this
if (engine != null) {
engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
}
}
/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the sequence number context
*/
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
verifyPrimary();
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
final Engine engine = getEngineOrNull();
if (engine != null) {
engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext);
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing
* shards and their local checkpoints.
*/
public class PrimaryContext implements Writeable {
private long clusterStateVersion;
public long clusterStateVersion() {
return clusterStateVersion;
}
private ObjectLongMap<String> inSyncLocalCheckpoints;
public ObjectLongMap<String> inSyncLocalCheckpoints() {
return inSyncLocalCheckpoints;
}
private ObjectLongMap<String> trackingLocalCheckpoints;
public ObjectLongMap<String> trackingLocalCheckpoints() {
return trackingLocalCheckpoints;
}
public PrimaryContext(
final long clusterStateVersion,
final ObjectLongMap<String> inSyncLocalCheckpoints,
final ObjectLongMap<String> trackingLocalCheckpoints) {
this.clusterStateVersion = clusterStateVersion;
this.inSyncLocalCheckpoints = inSyncLocalCheckpoints;
this.trackingLocalCheckpoints = trackingLocalCheckpoints;
}
public PrimaryContext(final StreamInput in) throws IOException {
clusterStateVersion = in.readVLong();
inSyncLocalCheckpoints = readMap(in);
trackingLocalCheckpoints = readMap(in);
}
private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException {
final int length = in.readVInt();
final ObjectLongMap<String> map = new ObjectLongHashMap<>(length);
for (int i = 0; i < length; i++) {
final String key = in.readString();
final long value = in.readZLong();
map.addTo(key, value);
}
return map;
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(clusterStateVersion);
writeMap(out, inSyncLocalCheckpoints);
writeMap(out, trackingLocalCheckpoints);
}
private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException {
out.writeVInt(map.size());
for (ObjectLongCursor<String> cursor : map) {
out.writeString(cursor.key);
out.writeZLong(cursor.value);
}
}
@Override
public String toString() {
return "PrimaryContext{" +
"clusterStateVersion=" + clusterStateVersion +
", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints +
", trackingLocalCheckpoints=" + trackingLocalCheckpoints +
'}';
}
}

View File

@ -571,7 +571,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
primaryReplicaSyncer::resync);
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds);
}
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
@ -758,12 +758,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
/**
* Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
void updateAllocationIdsFromMaster(
long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
}
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {

View File

@ -82,6 +82,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/hand_off_primary_context";
}
private final ThreadPool threadPool;
@ -116,6 +117,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
transportService.registerRequestHandler(
Actions.HANDOFF_PRIMARY_CONTEXT,
RecoveryHandoffPrimaryContextRequest::new,
ThreadPool.Names.GENERIC,
new HandoffPrimaryContextRequestHandler());
}
@Override
@ -411,6 +417,18 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
}
class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
@Override
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().handoffPrimaryContext(request.primaryContext());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
/**
* The request object to handoff the primary context to the relocation target.
*/
class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private PrimaryContext primaryContext;
/**
* Initialize an empty request (used to serialize into when reading from a stream).
*/
RecoveryHandoffPrimaryContextRequest() {
}
/**
* Initialize a request for the specified relocation.
*
* @param recoveryId the recovery ID of the relocation
* @param shardId the shard ID of the relocation
* @param primaryContext the primary context
*/
RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.primaryContext = primaryContext;
}
long recoveryId() {
return this.recoveryId;
}
ShardId shardId() {
return shardId;
}
PrimaryContext primaryContext() {
return primaryContext;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
primaryContext = new PrimaryContext(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
primaryContext.writeTo(out);
}
@Override
public String toString() {
return "RecoveryHandoffPrimaryContextRequest{" +
"recoveryId=" + recoveryId +
", shardId=" + shardId +
", primaryContext=" + primaryContext +
'}';
}
}

View File

@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
@ -41,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@ -52,6 +54,7 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import java.io.BufferedOutputStream;
@ -60,7 +63,9 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
@ -450,7 +455,21 @@ public class RecoverySourceHandler {
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
cancellableThreads.execute(() -> {
/*
* Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
* shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
try (Releasable ignored = onAcquired.actionGet()) {
if (shard.state() == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shard.shardId());
}
shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint);
}
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});
@ -465,7 +484,7 @@ public class RecoverySourceHandler {
cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion));
logger.trace("performing relocation hand-off");
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext));
}
/*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and

View File

@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -63,7 +63,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@ -379,6 +378,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
ensureClusterStateVersionCallback.accept(clusterStateVersion);
}
@Override
public void handoffPrimaryContext(final PrimaryContext primaryContext) {
indexShard.updateAllocationIdsFromPrimaryContext(primaryContext);
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
@ -49,6 +50,13 @@ public interface RecoveryTargetHandler {
*/
void ensureClusterStateVersion(long clusterStateVersion);
/**
* Handoff the primary context between the relocation source and the relocation target.
*
* @param primaryContext the primary context from the relocation source
*/
void handoffPrimaryContext(PrimaryContext primaryContext);
/**
* Index a set of translog operations on the target
* @param operations operations to index

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -98,6 +99,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void handoffPrimaryContext(final PrimaryContext primaryContext) {
transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final RecoveryTranslogOperationsRequest translogOperationsRequest =

View File

@ -2022,7 +2022,10 @@ public class InternalEngineTests extends ESTestCase {
initialEngine = engine;
initialEngine
.seqNoService()
.updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet());
.updateAllocationIdsFromMaster(
randomNonNegativeLong(),
new HashSet<>(Arrays.asList("primary", "replica")),
Collections.emptySet());
for (int op = 0; op < opCount; op++) {
final String id;
// mostly index, sometimes delete

View File

@ -122,6 +122,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> {
private long clusterStateVersion;
private IndexShard primary;
private IndexMetaData indexMetaData;
private final List<IndexShard> replicas;
@ -142,6 +143,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting));
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
clusterStateVersion = 1;
updateAllocationIDsOnPrimary();
for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) {
addReplica();
@ -222,6 +224,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
clusterStateVersion++;
updateAllocationIDsOnPrimary();
for (final IndexShard replica : replicas) {
recoverReplica(replica);
@ -241,6 +244,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
replicas.add(replica);
clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
@ -255,6 +259,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null,
getEngineFactory(shardRouting));
replicas.add(newReplica);
clusterStateVersion++;
updateAllocationIDsOnPrimary();
return newReplica;
}
@ -274,6 +279,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
closeShards(primary);
primary = replica;
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
@ -289,6 +295,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
fut.onFailure(e);
}
}));
clusterStateVersion++;
updateAllocationIDsOnPrimary();
return fut;
}
@ -296,6 +303,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
synchronized boolean removeReplica(IndexShard replica) {
final boolean removed = replicas.remove(replica);
if (removed) {
clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
return removed;
@ -315,6 +323,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
clusterStateVersion++;
updateAllocationIDsOnPrimary();
}
@ -402,7 +411,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
initializing.add(shard.allocationId().getId());
}
}
primary.updateAllocationIdsFromMaster(active, initializing);
primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing);
}
}

View File

@ -19,9 +19,13 @@
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -29,7 +33,6 @@ import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -43,11 +46,15 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
public class GlobalCheckpointTrackerTests extends ESTestCase {
@ -79,6 +86,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testGlobalCheckpointUpdate() {
final long initialClusterStateVersion = randomNonNegativeLong();
Map<String, Long> allocations = new HashMap<>();
Map<String, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> active = new HashSet<>(activeWithCheckpoints.keySet());
@ -107,7 +115,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
tracker.updateAllocationIdsFromMaster(active, initializing);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing);
initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getGlobalCheckpoint()));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
@ -130,7 +138,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
tracker.updateAllocationIdsFromMaster(newActive, initializing);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActive, initializing);
// now notify for the new id
tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4));
@ -146,6 +154,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
assigned.putAll(active);
assigned.putAll(initializing);
tracker.updateAllocationIdsFromMaster(
randomNonNegativeLong(),
active.keySet(),
initializing.keySet());
randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
@ -166,7 +175,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
public void testMissingInSyncIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet());
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
randomSubsetOf(randomInt(initializing.size() - 1),
initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId)));
@ -184,7 +193,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet());
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
@ -196,6 +205,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
final long initialClusterStateVersion = randomNonNegativeLong();
final Map<String, Long> activeToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
@ -211,7 +221,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
tracker.updateAllocationIdsFromMaster(active, initializing);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing);
if (randomBoolean()) {
initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint()));
} else {
@ -223,11 +233,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
// now remove shards
if (randomBoolean()) {
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, activeToStay.keySet(), initializingToStay.keySet());
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
@ -243,7 +253,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final AtomicBoolean complete = new AtomicBoolean();
final String inSyncAllocationId =randomAlphaOfLength(16);
final String trackingAllocationId = randomAlphaOfLength(16);
tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateAllocationIdsFromMaster(
randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
@ -291,7 +302,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final AtomicBoolean interrupted = new AtomicBoolean();
final String inSyncAllocationId = randomAlphaOfLength(16);
final String trackingAllocationId = randomAlphaOfLength(32);
tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateAllocationIdsFromMaster(
randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
@ -329,21 +341,14 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testUpdateAllocationIdsFromMaster() throws Exception {
final long initialClusterStateVersion = randomNonNegativeLong();
final int numberOfActiveAllocationsIds = randomIntBetween(2, 16);
final Set<String> activeAllocationIds =
IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16)).collect(Collectors.toSet());
final int numberOfInitializingIds = randomIntBetween(2, 16);
final Set<String> initializingIds =
IntStream.range(0, numberOfInitializingIds).mapToObj(i -> {
do {
final String initializingId = randomAlphaOfLength(16);
// ensure we do not duplicate an allocation ID in active and initializing sets
if (!activeAllocationIds.contains(initializingId)) {
return initializingId;
}
} while (true);
}).collect(Collectors.toSet());
tracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingIds);
final Tuple<Set<String>, Set<String>> activeAndInitializingAllocationIds =
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<String> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<String> initializingIds = activeAndInitializingAllocationIds.v2();
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingIds);
// first we assert that the in-sync and tracking sets are set up correctly
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
@ -364,7 +369,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final List<String> removingInitializingAllocationIds = randomSubsetOf(initializingIds);
final Set<String> newInitializingAllocationIds =
initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet());
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActiveAllocationIds, newInitializingAllocationIds);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a)));
@ -376,7 +381,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
*/
newActiveAllocationIds.add(randomAlphaOfLength(32));
newInitializingAllocationIds.add(randomAlphaOfLength(64));
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, newActiveAllocationIds, newInitializingAllocationIds);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(
newActiveAllocationIds
@ -416,7 +421,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
// using a different length than we have been using above ensures that we can not collide with a previous allocation ID
final String newSyncingAllocationId = randomAlphaOfLength(128);
newInitializingAllocationIds.add(newSyncingAllocationId);
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 3, newActiveAllocationIds, newInitializingAllocationIds);
final CyclicBarrier barrier = new CyclicBarrier(2);
final Thread thread = new Thread(() -> {
try {
@ -450,7 +455,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
* the in-sync set even if we receive a cluster state update that does not reflect this.
*
*/
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 4, newActiveAllocationIds, newInitializingAllocationIds);
assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId));
assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId));
}
@ -471,7 +476,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final String active = randomAlphaOfLength(16);
final String initializing = randomAlphaOfLength(32);
tracker.updateAllocationIdsFromMaster(Collections.singleton(active), Collections.singleton(initializing));
tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing));
final CyclicBarrier barrier = new CyclicBarrier(4);
@ -516,7 +521,216 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
markingThread.join();
assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint));
}
public void testPrimaryContextOlderThanAppliedClusterState() {
final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE - 1) + 1;
final int numberOfActiveAllocationsIds = randomIntBetween(0, 8);
final int numberOfInitializingIds = randomIntBetween(0, 8);
final Tuple<Set<String>, Set<String>> activeAndInitializingAllocationIds =
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<String> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<String> initializingAllocationIds = activeAndInitializingAllocationIds.v2();
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds);
/*
* We are going to establish a primary context from a cluster state version older than the applied cluster state version on the
* tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the
* newer cluster state is a superset of the allocation IDs in the applied cluster state with the caveat that an existing
* initializing allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the
* set of initializing allocation IDs is otherwise arbitrary.
*/
final int numberOfAdditionalInitializingAllocationIds = randomIntBetween(0, 8);
final Set<String> initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds));
final Set<String> newInitializingAllocationIds =
randomAllocationIdsExcludingExistingIds(
Sets.union(activeAllocationIds, initializingAllocationIds), numberOfAdditionalInitializingAllocationIds);
final Set<String> contextInitializingIds = Sets.union(
new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))),
newInitializingAllocationIds);
final int numberOfAdditionalActiveAllocationIds = randomIntBetween(0, 8);
final Set<String> contextActiveAllocationIds = Sets.union(
Sets.union(
activeAllocationIds,
randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfAdditionalActiveAllocationIds)),
initializedAllocationIds);
final ObjectLongMap<String> activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>();
for (final String allocationId : contextActiveAllocationIds) {
activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
}
final ObjectLongMap<String> initializingAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>();
for (final String allocationId : contextInitializingIds) {
initializingAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
}
final PrimaryContext primaryContext = new PrimaryContext(
initialClusterStateVersion - randomIntBetween(0, Math.toIntExact(initialClusterStateVersion) - 1),
activeAllocationIdsLocalCheckpoints,
initializingAllocationIdsLocalCheckpoints);
tracker.updateAllocationIdsFromPrimaryContext(primaryContext);
// the primary context carries an older cluster state version
assertThat(tracker.appliedClusterStateVersion, equalTo(initialClusterStateVersion));
// only existing active allocation IDs and initializing allocation IDs that moved to initialized should be in-sync
assertThat(
Sets.union(activeAllocationIds, initializedAllocationIds),
equalTo(
StreamSupport
.stream(tracker.inSyncLocalCheckpoints.keys().spliterator(), false)
.map(e -> e.value)
.collect(Collectors.toSet())));
// the local checkpoints known to the tracker for in-sync shards should match what is known in the primary context
for (final String allocationId : Sets.union(activeAllocationIds, initializedAllocationIds)) {
assertThat(
tracker.inSyncLocalCheckpoints.get(allocationId), equalTo(primaryContext.inSyncLocalCheckpoints().get(allocationId)));
}
// only existing initializing allocation IDs that did not moved to initialized should be tracked
assertThat(
Sets.difference(initializingAllocationIds, initializedAllocationIds),
equalTo(
StreamSupport
.stream(tracker.trackingLocalCheckpoints.keys().spliterator(), false)
.map(e -> e.value)
.collect(Collectors.toSet())));
// the local checkpoints known to the tracker for initializing shards should match what is known in the primary context
for (final String allocationId : Sets.difference(initializingAllocationIds, initializedAllocationIds)) {
if (primaryContext.trackingLocalCheckpoints().containsKey(allocationId)) {
assertThat(
tracker.trackingLocalCheckpoints.get(allocationId),
equalTo(primaryContext.trackingLocalCheckpoints().get(allocationId)));
} else {
assertThat(tracker.trackingLocalCheckpoints.get(allocationId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}
// the global checkpoint can only be computed from active allocation IDs and initializing allocation IDs that moved to initializing
final long globalCheckpoint =
StreamSupport
.stream(activeAllocationIdsLocalCheckpoints.spliterator(), false)
.filter(e -> tracker.inSyncLocalCheckpoints.containsKey(e.key) || initializedAllocationIds.contains(e.key))
.mapToLong(e -> e.value)
.min()
.orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO);
assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint));
}
public void testPrimaryContextNewerThanAppliedClusterState() {
final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE);
final int numberOfActiveAllocationsIds = randomIntBetween(0, 8);
final int numberOfInitializingIds = randomIntBetween(0, 8);
final Tuple<Set<String>, Set<String>> activeAndInitializingAllocationIds =
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<String> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<String> initializingAllocationIds = activeAndInitializingAllocationIds.v2();
tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds);
/*
* We are going to establish a primary context from a cluster state version older than the applied cluster state version on the
* tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the
* newer cluster state is a subset of the allocation IDs in the applied cluster state with the caveat that an existing initializing
* allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the set of
* initializing allocation IDs is otherwise arbitrary.
*/
final int numberOfNewInitializingAllocationIds = randomIntBetween(0, 8);
final Set<String> initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds));
final Set<String> newInitializingAllocationIds =
randomAllocationIdsExcludingExistingIds(
Sets.union(activeAllocationIds, initializingAllocationIds), numberOfNewInitializingAllocationIds);
final ObjectLongMap<String> activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>();
for (final String allocationId : Sets.union(new HashSet<>(randomSubsetOf(activeAllocationIds)), initializedAllocationIds)) {
activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
}
final ObjectLongMap<String> initializingIdsLocalCheckpoints = new ObjectLongHashMap<>();
final Set<String> contextInitializingAllocationIds = Sets.union(
new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))),
newInitializingAllocationIds);
for (final String allocationId : contextInitializingAllocationIds) {
initializingIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong());
}
final PrimaryContext primaryContext =
new PrimaryContext(
initialClusterStateVersion + randomIntBetween(0, Integer.MAX_VALUE) + 1,
activeAllocationIdsLocalCheckpoints,
initializingIdsLocalCheckpoints);
tracker.updateAllocationIdsFromPrimaryContext(primaryContext);
final PrimaryContext trackerPrimaryContext = tracker.primaryContext();
try {
assertTrue(tracker.sealed());
final long globalCheckpoint =
StreamSupport
.stream(activeAllocationIdsLocalCheckpoints.values().spliterator(), false)
.mapToLong(e -> e.value)
.min()
.orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO);
// the primary context contains knowledge of the state of the entire universe
assertThat(primaryContext.clusterStateVersion(), equalTo(trackerPrimaryContext.clusterStateVersion()));
assertThat(primaryContext.inSyncLocalCheckpoints(), equalTo(trackerPrimaryContext.inSyncLocalCheckpoints()));
assertThat(primaryContext.trackingLocalCheckpoints(), equalTo(trackerPrimaryContext.trackingLocalCheckpoints()));
assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint));
} finally {
tracker.releasePrimaryContext();
assertFalse(tracker.sealed());
}
}
public void testPrimaryContextSealing() {
// the tracker should start in the state of not being sealed
assertFalse(tracker.sealed());
// sampling the primary context should seal the tracker
tracker.primaryContext();
assertTrue(tracker.sealed());
// invoking any method that mutates the state of the tracker should fail
assertIllegalStateExceptionWhenSealed(() -> tracker.updateLocalCheckpoint(randomAlphaOfLength(16), randomNonNegativeLong()));
assertIllegalStateExceptionWhenSealed(() -> tracker.updateGlobalCheckpointOnReplica(randomNonNegativeLong()));
assertIllegalStateExceptionWhenSealed(
() -> tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.emptySet(), Collections.emptySet()));
assertIllegalStateExceptionWhenSealed(() -> tracker.updateAllocationIdsFromPrimaryContext(mock(PrimaryContext.class)));
assertIllegalStateExceptionWhenSealed(() -> tracker.primaryContext());
assertIllegalStateExceptionWhenSealed(() -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(16), randomNonNegativeLong()));
// closing the releasable should unseal the tracker
tracker.releasePrimaryContext();
assertFalse(tracker.sealed());
}
private void assertIllegalStateExceptionWhenSealed(final ThrowingRunnable runnable) {
final IllegalStateException e = expectThrows(IllegalStateException.class, runnable);
assertThat(e, hasToString(containsString("global checkpoint tracker is sealed")));
}
private Tuple<Set<String>, Set<String>> randomActiveAndInitializingAllocationIds(
final int numberOfActiveAllocationsIds,
final int numberOfInitializingIds) {
final Set<String> activeAllocationIds =
IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16) + i).collect(Collectors.toSet());
final Set<String> initializingIds = randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfInitializingIds);
return Tuple.tuple(activeAllocationIds, initializingIds);
}
private Set<String> randomAllocationIdsExcludingExistingIds(final Set<String> existingAllocationIds, final int numberOfAllocationIds) {
return IntStream.range(0, numberOfAllocationIds).mapToObj(i -> {
do {
final String newAllocationId = randomAlphaOfLength(16);
// ensure we do not duplicate an allocation ID
if (!existingAllocationIds.contains(newAllocationId)) {
return newAllocationId + i;
}
} while (true);
}).collect(Collectors.toSet());
}
private void markAllocationIdAsInSyncQuietly(

View File

@ -536,7 +536,7 @@ public class IndexShardTests extends IndexShardTestCase {
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
indexShard.updateRoutingEntry(routing);
indexShard.relocated("test");
indexShard.relocated("test", primaryContext -> {});
engineClosed = false;
break;
}
@ -551,7 +551,7 @@ public class IndexShardTests extends IndexShardTestCase {
if (shardRouting.primary() == false) {
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX));
assertThat(e, hasToString(containsString("shard is not a primary")));
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
}
final long primaryTerm = indexShard.getPrimaryTerm();
@ -1042,7 +1042,7 @@ public class IndexShardTests extends IndexShardTestCase {
Thread recoveryThread = new Thread(() -> {
latch.countDown();
try {
shard.relocated("simulated recovery");
shard.relocated("simulated recovery", primaryContext -> {});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -1071,7 +1071,7 @@ public class IndexShardTests extends IndexShardTestCase {
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
Thread recoveryThread = new Thread(() -> {
try {
shard.relocated("simulated recovery");
shard.relocated("simulated recovery", primaryContext -> {});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -1124,7 +1124,7 @@ public class IndexShardTests extends IndexShardTestCase {
AtomicBoolean relocated = new AtomicBoolean();
final Thread recoveryThread = new Thread(() -> {
try {
shard.relocated("simulated recovery");
shard.relocated("simulated recovery", primaryContext -> {});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -1158,7 +1158,7 @@ public class IndexShardTests extends IndexShardTestCase {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.relocated("test");
shard.relocated("test", primaryContext -> {});
expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting));
closeShards(shard);
}
@ -1168,7 +1168,7 @@ public class IndexShardTests extends IndexShardTestCase {
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.updateRoutingEntry(originalRouting);
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test"));
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {}));
closeShards(shard);
}
@ -1187,7 +1187,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
protected void doRun() throws Exception {
cyclicBarrier.await();
shard.relocated("test");
shard.relocated("test", primaryContext -> {});
}
});
relocationThread.start();
@ -1362,7 +1362,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
shard.updateRoutingEntry(inRecoveryRouting);
shard.relocated("simulate mark as relocated");
shard.relocated("simulate mark as relocated", primaryContext -> {});
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try {
shard.updateRoutingEntry(origRouting);

View File

@ -62,7 +62,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
String allocationId = shard.routingEntry().allocationId().getId();
shard.updateAllocationIdsFromMaster(Collections.singleton(allocationId), Collections.emptySet());
shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet());
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());

View File

@ -322,6 +322,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
* Mock for {@link IndexShard}
*/
protected class MockIndexShard implements IndicesClusterStateService.Shard {
private volatile long clusterStateVersion;
private volatile ShardRouting shardRouting;
private volatile RecoveryState recoveryState;
private volatile Set<String> activeAllocationIds;
@ -372,7 +373,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
}
@Override
public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
public void updateAllocationIdsFromMaster(
long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
this.clusterStateVersion = applyingClusterStateVersion;
this.activeAllocationIds = activeAllocationIds;
this.initializingAllocationIds = initializingAllocationIds;
}

View File

@ -35,6 +35,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
@ -76,6 +77,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -453,8 +455,14 @@ public class RecoverySourceHandlerTests extends ESTestCase {
relocated.set(true);
assertTrue(recoveriesDelayed.get());
return null;
}).when(shard).relocated(any(String.class));
}).when(shard).relocated(any(String.class), any(Consumer.class));
when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
final ActionListener<Releasable> listener = (ActionListener<Releasable>)invocationOnMock.getArguments()[0];
listener.onResponse(() -> {});
return null;
}).when(shard).acquirePrimaryOperationPermit(any(ActionListener.class), any(String.class));
// final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class);
// when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef);

View File

@ -53,7 +53,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllS
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE")
@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.index.seqno:TRACE,org.elasticsearch.indices.recovery:TRACE")
public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
private final Logger logger = Loggers.getLogger(RecoveryWhileUnderLoadIT.class);

View File

@ -514,14 +514,6 @@ public class RelocationIT extends ESIntegTestCase {
// refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down
client().admin().indices().prepareRefresh("test").get();
/*
* We have to execute a second refresh as in the face of relocations, the relocation target is not aware of the in-sync set and so
* the first refresh would bring back the local checkpoint for any shards added to the in-sync set that the relocation target was
* not tracking.
*/
// TODO: remove this after a primary context is transferred during relocation handoff
client().admin().indices().prepareRefresh("test").get();
}
class RecoveryCorruption extends MockTransportService.DelegateTransport {