mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-28 02:48:38 +00:00
Use a single method to update shard state
This commit refactors index shard to provide a single method for updating the shard state on an incoming cluster state update. Relates #25431
This commit is contained in:
parent
ebdae09df3
commit
5a4a47332c
core/src
main/java/org/elasticsearch
test/java/org/elasticsearch
test/framework/src/main/java/org/elasticsearch/index/shard
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexOptions;
|
||||
@ -283,7 +284,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
searcherWrapper = indexSearcherWrapper;
|
||||
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
|
||||
refreshListeners = buildRefreshListeners();
|
||||
persistMetadata(shardRouting, null);
|
||||
persistMetadata(path, indexSettings, shardRouting, null, logger);
|
||||
}
|
||||
|
||||
public Store store() {
|
||||
@ -342,86 +343,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
return this.primaryTerm;
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies the shard of an increase in the primary term.
|
||||
*
|
||||
* @param newPrimaryTerm the new primary term
|
||||
* @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
|
||||
*/
|
||||
@Override
|
||||
public void updatePrimaryTerm(final long newPrimaryTerm,
|
||||
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
|
||||
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
|
||||
synchronized (mutex) {
|
||||
if (newPrimaryTerm != primaryTerm) {
|
||||
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
|
||||
// in one state causing it's term to be incremented. Note that if both current shard state and new
|
||||
// shard state are initializing, we could replace the current shard and reinitialize it. It is however
|
||||
// possible that this shard is being started. This can happen if:
|
||||
// 1) Shard is post recovery and sends shard started to the master
|
||||
// 2) Node gets disconnected and rejoins
|
||||
// 3) Master assigns the shard back to the node
|
||||
// 4) Master processes the shard started and starts the shard
|
||||
// 5) The node process the cluster state where the shard is both started and primary term is incremented.
|
||||
//
|
||||
// We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
|
||||
// potentially preventing re-allocation.
|
||||
assert shardRouting.initializing() == false :
|
||||
"a started primary shard should never update its term; "
|
||||
+ "shard " + shardRouting + ", "
|
||||
+ "current term [" + primaryTerm + "], "
|
||||
+ "new term [" + newPrimaryTerm + "]";
|
||||
assert newPrimaryTerm > primaryTerm :
|
||||
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
|
||||
/*
|
||||
* Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
|
||||
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
|
||||
* incremented.
|
||||
*/
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
// to prevent primary relocation handoff while resync is not completed
|
||||
boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
|
||||
if (resyncStarted == false) {
|
||||
throw new IllegalStateException("cannot start resync while it's already in progress");
|
||||
}
|
||||
indexShardOperationPermits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
latch.await();
|
||||
try {
|
||||
getEngine().fillSeqNoGaps(newPrimaryTerm);
|
||||
primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
|
||||
@Override
|
||||
public void onResponse(ResyncTask resyncTask) {
|
||||
logger.info("primary-replica resync completed with {} operations",
|
||||
resyncTask.getResyncedOperations());
|
||||
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
|
||||
assert resyncCompleted : "primary-replica resync finished but was not started";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
|
||||
assert resyncCompleted : "primary-replica resync finished but was not started";
|
||||
if (state == IndexShardState.CLOSED) {
|
||||
// ignore, shutting down
|
||||
} else {
|
||||
failShard("exception during primary-replica resync", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (final AlreadyClosedException e) {
|
||||
// okay, the index was deleted
|
||||
}
|
||||
},
|
||||
e -> failShard("exception during primary term transition", e));
|
||||
primaryTerm = newPrimaryTerm;
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the latest cluster routing entry received with this shard.
|
||||
*/
|
||||
@ -434,50 +355,29 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
return cachingPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the shards routing entry. This mutate the shards internal state depending
|
||||
* on the changes that get introduced by the new routing value. This method will persist shard level metadata.
|
||||
*
|
||||
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
|
||||
* @throws IOException if shard state could not be persisted
|
||||
*/
|
||||
public void updateRoutingEntry(ShardRouting newRouting) throws IOException {
|
||||
|
||||
@Override
|
||||
public void updateShardState(final ShardRouting newRouting,
|
||||
final long newPrimaryTerm,
|
||||
final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
|
||||
final long applyingClusterStateVersion,
|
||||
final Set<String> activeAllocationIds,
|
||||
final Set<String> initializingAllocationIds) throws IOException {
|
||||
final ShardRouting currentRouting;
|
||||
synchronized (mutex) {
|
||||
currentRouting = this.shardRouting;
|
||||
updateRoutingEntry(newRouting);
|
||||
|
||||
if (!newRouting.shardId().equals(shardId())) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId());
|
||||
}
|
||||
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
|
||||
}
|
||||
if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
|
||||
throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
|
||||
+ currentRouting + ", new " + newRouting);
|
||||
}
|
||||
if (shardRouting.primary()) {
|
||||
updatePrimaryTerm(newPrimaryTerm, primaryReplicaSyncer);
|
||||
|
||||
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
|
||||
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
|
||||
// we want to refresh *before* we move to internal STARTED state
|
||||
try {
|
||||
getEngine().refresh("cluster_state_started");
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to refresh due to move to cluster wide started", e);
|
||||
final Engine engine = getEngineOrNull();
|
||||
// if the engine is not yet started, we are not ready yet and can just ignore this
|
||||
if (engine != null) {
|
||||
engine.seqNoService().updateAllocationIdsFromMaster(
|
||||
applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
|
||||
}
|
||||
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
|
||||
} else if (state == IndexShardState.RELOCATED &&
|
||||
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
|
||||
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
|
||||
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
|
||||
// active primaries.
|
||||
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
|
||||
}
|
||||
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
|
||||
state == IndexShardState.CLOSED :
|
||||
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
|
||||
this.shardRouting = newRouting;
|
||||
persistMetadata(newRouting, currentRouting);
|
||||
}
|
||||
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
|
||||
indexEventListener.afterIndexShardStarted(this);
|
||||
@ -487,6 +387,117 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
}
|
||||
}
|
||||
|
||||
private void updateRoutingEntry(ShardRouting newRouting) throws IOException {
|
||||
assert Thread.holdsLock(mutex);
|
||||
final ShardRouting currentRouting = this.shardRouting;
|
||||
|
||||
if (!newRouting.shardId().equals(shardId())) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId());
|
||||
}
|
||||
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
|
||||
}
|
||||
if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
|
||||
throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
|
||||
+ currentRouting + ", new " + newRouting);
|
||||
}
|
||||
|
||||
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
|
||||
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
|
||||
// we want to refresh *before* we move to internal STARTED state
|
||||
try {
|
||||
getEngine().refresh("cluster_state_started");
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to refresh due to move to cluster wide started", e);
|
||||
}
|
||||
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
|
||||
} else if (state == IndexShardState.RELOCATED &&
|
||||
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
|
||||
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
|
||||
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
|
||||
// active primaries.
|
||||
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
|
||||
}
|
||||
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
|
||||
state == IndexShardState.CLOSED :
|
||||
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
|
||||
this.shardRouting = newRouting;
|
||||
persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
|
||||
}
|
||||
|
||||
private void updatePrimaryTerm(
|
||||
final long newPrimaryTerm, final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
|
||||
assert Thread.holdsLock(mutex);
|
||||
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
|
||||
if (newPrimaryTerm != primaryTerm) {
|
||||
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
|
||||
* in one state causing it's term to be incremented. Note that if both current shard state and new
|
||||
* shard state are initializing, we could replace the current shard and reinitialize it. It is however
|
||||
* possible that this shard is being started. This can happen if:
|
||||
* 1) Shard is post recovery and sends shard started to the master
|
||||
* 2) Node gets disconnected and rejoins
|
||||
* 3) Master assigns the shard back to the node
|
||||
* 4) Master processes the shard started and starts the shard
|
||||
* 5) The node process the cluster state where the shard is both started and primary term is incremented.
|
||||
*
|
||||
* We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
|
||||
* potentially preventing re-allocation.
|
||||
*/
|
||||
assert shardRouting.initializing() == false :
|
||||
"a started primary shard should never update its term; "
|
||||
+ "shard " + shardRouting + ", "
|
||||
+ "current term [" + primaryTerm + "], "
|
||||
+ "new term [" + newPrimaryTerm + "]";
|
||||
assert newPrimaryTerm > primaryTerm :
|
||||
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
|
||||
/*
|
||||
* Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
|
||||
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
|
||||
* incremented.
|
||||
*/
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
// to prevent primary relocation handoff while resync is not completed
|
||||
boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
|
||||
if (resyncStarted == false) {
|
||||
throw new IllegalStateException("cannot start resync while it's already in progress");
|
||||
}
|
||||
indexShardOperationPermits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
latch.await();
|
||||
try {
|
||||
getEngine().fillSeqNoGaps(newPrimaryTerm);
|
||||
primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
|
||||
@Override
|
||||
public void onResponse(ResyncTask resyncTask) {
|
||||
logger.info("primary-replica resync completed with {} operations",
|
||||
resyncTask.getResyncedOperations());
|
||||
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
|
||||
assert resyncCompleted : "primary-replica resync finished but was not started";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
|
||||
assert resyncCompleted : "primary-replica resync finished but was not started";
|
||||
if (state == IndexShardState.CLOSED) {
|
||||
// ignore, shutting down
|
||||
} else {
|
||||
failShard("exception during primary-replica resync", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (final AlreadyClosedException e) {
|
||||
// okay, the index was deleted
|
||||
}
|
||||
},
|
||||
e -> failShard("exception during primary term transition", e));
|
||||
primaryTerm = newPrimaryTerm;
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
|
||||
*/
|
||||
@ -1683,25 +1694,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
seqNoService.updateGlobalCheckpointOnReplica(globalCheckpoint);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies the service of the current allocation IDs in the cluster state. See
|
||||
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
|
||||
* for details.
|
||||
*
|
||||
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
|
||||
* @param activeAllocationIds the allocation IDs of the currently active shard copies
|
||||
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
|
||||
*/
|
||||
public void updateAllocationIdsFromMaster(
|
||||
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
|
||||
verifyPrimary();
|
||||
final Engine engine = getEngineOrNull();
|
||||
// if the engine is not yet started, we are not ready yet and can just ignore this
|
||||
if (engine != null) {
|
||||
engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
|
||||
*
|
||||
@ -1972,11 +1964,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
return engineFactory.newReadWriteEngine(config);
|
||||
}
|
||||
|
||||
// pkg private for testing
|
||||
void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException {
|
||||
private static void persistMetadata(
|
||||
final ShardPath shardPath,
|
||||
final IndexSettings indexSettings,
|
||||
final ShardRouting newRouting,
|
||||
final @Nullable ShardRouting currentRouting,
|
||||
final Logger logger) throws IOException {
|
||||
assert newRouting != null : "newRouting must not be null";
|
||||
|
||||
// only persist metadata if routing information that is persisted in shard state metadata actually changed
|
||||
final ShardId shardId = newRouting.shardId();
|
||||
if (currentRouting == null
|
||||
|| currentRouting.primary() != newRouting.primary()
|
||||
|| currentRouting.allocationId().equals(newRouting.allocationId()) == false) {
|
||||
@ -1988,17 +1985,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
|
||||
}
|
||||
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
|
||||
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
|
||||
ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath().getShardStatePath());
|
||||
final ShardStateMetaData newShardStateMetadata =
|
||||
new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId());
|
||||
ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath());
|
||||
} else {
|
||||
logger.trace("{} skip writing shard state, has been written before", shardId);
|
||||
}
|
||||
}
|
||||
|
||||
private String getIndexUUID() {
|
||||
return indexSettings.getUUID();
|
||||
}
|
||||
|
||||
private DocumentMapperForType docMapper(String type) {
|
||||
return mapperService.documentMapperWithAutoCreate(type);
|
||||
}
|
||||
|
@ -87,7 +87,6 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -558,21 +557,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
||||
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
|
||||
|
||||
try {
|
||||
shard.updateRoutingEntry(shardRouting);
|
||||
if (shardRouting.primary()) {
|
||||
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
|
||||
/*
|
||||
* Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on
|
||||
* old nodes will go through a file-based recovery which will also transfer sequence number information.
|
||||
*/
|
||||
final Set<String> activeIds =
|
||||
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
|
||||
final Set<String> initializingIds =
|
||||
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
|
||||
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
|
||||
primaryReplicaSyncer::resync);
|
||||
shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds);
|
||||
}
|
||||
final long primaryTerm = clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id());
|
||||
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
|
||||
/*
|
||||
* Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on old
|
||||
* nodes will go through a file-based recovery which will also transfer sequence number information.
|
||||
*/
|
||||
final Set<String> activeIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
|
||||
final Set<String> initializingIds =
|
||||
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
|
||||
shard.updateShardState(
|
||||
shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), activeIds, initializingIds);
|
||||
} catch (Exception e) {
|
||||
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
|
||||
return;
|
||||
@ -739,33 +734,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
||||
RecoveryState recoveryState();
|
||||
|
||||
/**
|
||||
* Updates the shards routing entry. This mutate the shards internal state depending
|
||||
* on the changes that get introduced by the new routing value. This method will persist shard level metadata.
|
||||
*
|
||||
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
|
||||
* @throws IOException if shard state could not be persisted
|
||||
*/
|
||||
void updateRoutingEntry(ShardRouting shardRouting) throws IOException;
|
||||
|
||||
/**
|
||||
* Update the primary term. This method should only be invoked on primary shards.
|
||||
*
|
||||
* @param primaryTerm the new primary term
|
||||
* @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
|
||||
*/
|
||||
void updatePrimaryTerm(long primaryTerm,
|
||||
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer);
|
||||
|
||||
/**
|
||||
* Notifies the service of the current allocation ids in the cluster state.
|
||||
* Updates the shard state based on an incoming cluster state:
|
||||
* - Updates and persists the new routing value.
|
||||
* - Updates the primary term if this shard is a primary.
|
||||
* - Updates the allocation ids that are tracked by the shard if it is a primary.
|
||||
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
|
||||
*
|
||||
* @param shardRouting the new routing entry
|
||||
* @param primaryTerm the new primary term
|
||||
* @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
|
||||
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
|
||||
* @param activeAllocationIds the allocation ids of the currently active shard copies
|
||||
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
|
||||
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
|
||||
* @throws IOException if shard state could not be persisted
|
||||
*/
|
||||
void updateAllocationIdsFromMaster(
|
||||
long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
|
||||
void updateShardState(ShardRouting shardRouting,
|
||||
long primaryTerm,
|
||||
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
|
||||
long applyingClusterStateVersion,
|
||||
Set<String> activeAllocationIds,
|
||||
Set<String> initializingAllocationIds) throws IOException;
|
||||
}
|
||||
|
||||
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.action.support.replication.TransportWriteActionTestHelp
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.AllocationId;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
@ -223,9 +224,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
|
||||
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
|
||||
primary.recoverFromStore();
|
||||
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
|
||||
clusterStateVersion++;
|
||||
updateAllocationIDsOnPrimary();
|
||||
HashSet<String> activeIds = new HashSet<>();
|
||||
activeIds.addAll(activeIds());
|
||||
activeIds.add(primary.routingEntry().allocationId().getId());
|
||||
HashSet<String> initializingIds = new HashSet<>();
|
||||
initializingIds.addAll(initializingIds());
|
||||
initializingIds.remove(primary.routingEntry().allocationId().getId());
|
||||
primary.updateShardState(ShardRoutingHelper.moveToStarted(primary.routingEntry()), primary.getPrimaryTerm(), null,
|
||||
++clusterStateVersion, activeIds, initializingIds);
|
||||
for (final IndexShard replica : replicas) {
|
||||
recoverReplica(replica);
|
||||
}
|
||||
@ -239,7 +245,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||
return replica;
|
||||
}
|
||||
|
||||
public synchronized void addReplica(IndexShard replica) {
|
||||
public synchronized void addReplica(IndexShard replica) throws IOException {
|
||||
assert shardRoutings().stream()
|
||||
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
|
||||
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
|
||||
@ -278,29 +284,43 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||
assertTrue(replicas.remove(replica));
|
||||
closeShards(primary);
|
||||
primary = replica;
|
||||
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
|
||||
|
||||
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
|
||||
primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
|
||||
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
|
||||
@Override
|
||||
public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
|
||||
listener.onResponse(resyncTask);
|
||||
fut.onResponse(resyncTask);
|
||||
}
|
||||
HashSet<String> activeIds = new HashSet<>();
|
||||
activeIds.addAll(activeIds());
|
||||
activeIds.add(replica.routingEntry().allocationId().getId());
|
||||
HashSet<String> initializingIds = new HashSet<>();
|
||||
initializingIds.addAll(initializingIds());
|
||||
initializingIds.remove(replica.routingEntry().allocationId().getId());
|
||||
primary.updateShardState(replica.routingEntry().moveActiveReplicaToPrimary(),
|
||||
newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
|
||||
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
|
||||
@Override
|
||||
public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
|
||||
listener.onResponse(resyncTask);
|
||||
fut.onResponse(resyncTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
fut.onFailure(e);
|
||||
}
|
||||
}), ++clusterStateVersion, activeIds, initializingIds);
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
fut.onFailure(e);
|
||||
}
|
||||
}));
|
||||
clusterStateVersion++;
|
||||
updateAllocationIDsOnPrimary();
|
||||
return fut;
|
||||
}
|
||||
|
||||
synchronized boolean removeReplica(IndexShard replica) {
|
||||
private synchronized Set<String> activeIds() {
|
||||
return shardRoutings().stream()
|
||||
.filter(ShardRouting::active).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private synchronized Set<String> initializingIds() {
|
||||
return shardRoutings().stream()
|
||||
.filter(ShardRouting::initializing).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
synchronized boolean removeReplica(IndexShard replica) throws IOException {
|
||||
final boolean removed = replicas.remove(replica);
|
||||
if (removed) {
|
||||
clusterStateVersion++;
|
||||
@ -401,17 +421,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||
}
|
||||
}
|
||||
|
||||
private void updateAllocationIDsOnPrimary() {
|
||||
Set<String> active = new HashSet<>();
|
||||
Set<String> initializing = new HashSet<>();
|
||||
for (ShardRouting shard: shardRoutings()) {
|
||||
if (shard.active()) {
|
||||
active.add(shard.allocationId().getId());
|
||||
} else {
|
||||
initializing.add(shard.allocationId().getId());
|
||||
}
|
||||
}
|
||||
primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing);
|
||||
private void updateAllocationIDsOnPrimary() throws IOException {
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, clusterStateVersion,
|
||||
activeIds(), initializingIds());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -525,16 +525,16 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
||||
}
|
||||
|
||||
|
||||
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
|
||||
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
|
||||
assertTrue(newShard.recoverFromStore());
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
|
||||
return newShard;
|
||||
}
|
||||
|
||||
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
|
||||
IndexingOperationListener... listeners) throws IOException {
|
||||
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
|
||||
IndexingOperationListener... listeners) throws IOException {
|
||||
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
|
||||
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
|
||||
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
|
||||
|
@ -192,7 +192,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
ShardStateMetaData shardStateMetaData = load(logger, shardStatePath);
|
||||
assertEquals(getShardStateMetadata(shard), shardStateMetaData);
|
||||
ShardRouting routing = shard.shardRouting;
|
||||
shard.updateRoutingEntry(routing);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, routing);
|
||||
|
||||
shardStateMetaData = load(logger, shardStatePath);
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
@ -200,7 +200,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
|
||||
|
||||
routing = TestShardRouting.relocate(shard.shardRouting, "some node", 42L);
|
||||
shard.updateRoutingEntry(routing);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, routing);
|
||||
shardStateMetaData = load(logger, shardStatePath);
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
assertEquals(shardStateMetaData,
|
||||
@ -345,8 +345,8 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
true,
|
||||
ShardRoutingState.STARTED,
|
||||
replicaRouting.allocationId());
|
||||
indexShard.updateRoutingEntry(primaryRouting);
|
||||
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
|
||||
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
|
||||
0L, Collections.emptySet(), Collections.emptySet());
|
||||
|
||||
final int delayedOperations = scaledRandomIntBetween(1, 64);
|
||||
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
|
||||
@ -436,8 +436,8 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
true,
|
||||
ShardRoutingState.STARTED,
|
||||
replicaRouting.allocationId());
|
||||
indexShard.updateRoutingEntry(primaryRouting);
|
||||
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
|
||||
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
|
||||
0L, Collections.emptySet(), Collections.emptySet());
|
||||
|
||||
/*
|
||||
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
|
||||
@ -478,8 +478,8 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
ShardRouting replicaRouting = indexShard.routingEntry();
|
||||
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
|
||||
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
|
||||
indexShard.updateRoutingEntry(primaryRouting);
|
||||
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
|
||||
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
|
||||
0L, Collections.emptySet(), Collections.emptySet());
|
||||
} else {
|
||||
indexShard = newStartedShard(true);
|
||||
}
|
||||
@ -545,7 +545,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
ShardRouting routing = indexShard.routingEntry();
|
||||
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
|
||||
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
|
||||
indexShard.updateRoutingEntry(routing);
|
||||
IndexShardTestCase.updateRoutingEntry(indexShard, routing);
|
||||
indexShard.relocated("test", primaryContext -> {});
|
||||
engineClosed = false;
|
||||
break;
|
||||
@ -830,7 +830,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
|
||||
|
||||
snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
@ -1088,7 +1088,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
public void testLockingBeforeAndAfterRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread recoveryThread = new Thread(() -> {
|
||||
latch.countDown();
|
||||
@ -1119,7 +1119,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
Thread recoveryThread = new Thread(() -> {
|
||||
try {
|
||||
shard.relocated("simulated recovery", primaryContext -> {});
|
||||
@ -1153,7 +1153,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
public void testStressRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
final int numThreads = randomIntBetween(2, 4);
|
||||
Thread[] indexThreads = new Thread[numThreads];
|
||||
CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads);
|
||||
@ -1208,17 +1208,17 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
final ShardRouting originalRouting = shard.routingEntry();
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
shard.relocated("test", primaryContext -> {});
|
||||
expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting));
|
||||
expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting));
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
final ShardRouting originalRouting = shard.routingEntry();
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
shard.updateRoutingEntry(originalRouting);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
IndexShardTestCase.updateRoutingEntry(shard, originalRouting);
|
||||
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {}));
|
||||
closeShards(shard);
|
||||
}
|
||||
@ -1227,7 +1227,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
final ShardRouting originalRouting = shard.routingEntry();
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
|
||||
AtomicReference<Exception> relocationException = new AtomicReference<>();
|
||||
Thread relocationThread = new Thread(new AbstractRunnable() {
|
||||
@ -1253,7 +1253,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
cyclicBarrier.await();
|
||||
shard.updateRoutingEntry(originalRouting);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, originalRouting);
|
||||
}
|
||||
});
|
||||
cancellingThread.start();
|
||||
@ -1291,7 +1291,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
|
||||
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
|
||||
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
|
||||
assertDocCount(newShard, 1);
|
||||
closeShards(newShard);
|
||||
}
|
||||
@ -1330,7 +1330,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
}
|
||||
}
|
||||
assertEquals(1, numNoops);
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
|
||||
assertDocCount(newShard, 1);
|
||||
assertDocCount(shard, 2);
|
||||
closeShards(newShard, shard);
|
||||
@ -1354,7 +1354,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
|
||||
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
|
||||
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
|
||||
assertDocCount(newShard, 0);
|
||||
closeShards(newShard);
|
||||
}
|
||||
@ -1397,7 +1397,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
|
||||
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore());
|
||||
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
|
||||
assertDocCount(newShard, 0);
|
||||
// we can't issue this request through a client because of the inconsistencies we created with the cluster state
|
||||
// doing it directly instead
|
||||
@ -1413,11 +1413,11 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
ShardRouting origRouting = shard.routingEntry();
|
||||
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
|
||||
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
|
||||
shard.updateRoutingEntry(inRecoveryRouting);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting);
|
||||
shard.relocated("simulate mark as relocated", primaryContext -> {});
|
||||
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
|
||||
try {
|
||||
shard.updateRoutingEntry(origRouting);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, origRouting);
|
||||
fail("Expected IndexShardRelocatedException");
|
||||
} catch (IndexShardRelocatedException expected) {
|
||||
}
|
||||
@ -1466,7 +1466,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
}
|
||||
}));
|
||||
|
||||
target.updateRoutingEntry(routing.moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
|
||||
assertDocs(target, "0");
|
||||
|
||||
closeShards(source, target);
|
||||
@ -1843,7 +1843,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
assertEquals(file.recovered(), file.length());
|
||||
}
|
||||
}
|
||||
targetShard.updateRoutingEntry(ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
|
||||
IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
|
||||
assertDocCount(targetShard, 2);
|
||||
}
|
||||
// now check that it's persistent ie. that the added shards are committed
|
||||
|
@ -62,7 +62,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
||||
boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
|
||||
|
||||
String allocationId = shard.routingEntry().allocationId().getId();
|
||||
shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet());
|
||||
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
|
||||
Collections.emptySet());
|
||||
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
|
||||
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
|
||||
|
||||
|
@ -31,6 +31,7 @@ import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardIT;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -450,7 +451,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||
assertEquals(1, imc.availableShards().size());
|
||||
assertTrue(newShard.recoverFromStore());
|
||||
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
|
||||
newShard.updateRoutingEntry(routing.moveToStarted());
|
||||
IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted());
|
||||
} finally {
|
||||
newShard.close("simon says", false);
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
@ -130,14 +131,14 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
|
||||
.updateUnassigned(unassignedInfo, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE);
|
||||
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
|
||||
IndexShard shard = index.createShard(newRouting);
|
||||
shard.updateRoutingEntry(newRouting);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
|
||||
assertEquals(5, counter.get());
|
||||
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
|
||||
emptyMap(), emptySet(), Version.CURRENT);
|
||||
shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null));
|
||||
shard.recoverFromStore();
|
||||
newRouting = ShardRoutingHelper.moveToStarted(newRouting);
|
||||
shard.updateRoutingEntry(newRouting);
|
||||
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
|
||||
assertEquals(6, counter.get());
|
||||
} finally {
|
||||
indicesService.removeIndex(idx, DELETED, "simon says");
|
||||
|
36
core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
36
core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
|
||||
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
@ -345,17 +346,12 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardRouting routingEntry() {
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexShardState state() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRoutingEntry(ShardRouting shardRouting) throws IOException {
|
||||
public void updateShardState(ShardRouting shardRouting,
|
||||
long newPrimaryTerm,
|
||||
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
|
||||
long applyingClusterStateVersion,
|
||||
Set<String> activeAllocationIds,
|
||||
Set<String> initializingAllocationIds) throws IOException {
|
||||
failRandomly();
|
||||
assertThat(this.shardId(), equalTo(shardRouting.shardId()));
|
||||
assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting));
|
||||
@ -364,20 +360,22 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
|
||||
shardRouting.active());
|
||||
}
|
||||
this.shardRouting = shardRouting;
|
||||
if (shardRouting.primary()) {
|
||||
term = newPrimaryTerm;
|
||||
this.clusterStateVersion = applyingClusterStateVersion;
|
||||
this.activeAllocationIds = activeAllocationIds;
|
||||
this.initializingAllocationIds = initializingAllocationIds;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePrimaryTerm(final long newPrimaryTerm,
|
||||
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
|
||||
term = newPrimaryTerm;
|
||||
public ShardRouting routingEntry() {
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateAllocationIdsFromMaster(
|
||||
long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
|
||||
this.clusterStateVersion = applyingClusterStateVersion;
|
||||
this.activeAllocationIds = activeAllocationIds;
|
||||
this.initializingAllocationIds = initializingAllocationIds;
|
||||
public IndexShardState state() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void updateTerm(long newTerm) {
|
||||
|
@ -355,7 +355,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
||||
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
|
||||
null));
|
||||
primary.recoverFromStore();
|
||||
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
|
||||
updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry()));
|
||||
}
|
||||
|
||||
public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRouting) throws IOException {
|
||||
shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, 0L, Collections.emptySet(), Collections.emptySet());
|
||||
}
|
||||
|
||||
protected void recoveryEmptyReplica(IndexShard replica) throws IOException {
|
||||
@ -424,7 +428,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
||||
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build());
|
||||
recovery.recoverToTarget();
|
||||
recoveryTarget.markAsDone();
|
||||
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));
|
||||
updateRoutingEntry(replica, ShardRoutingHelper.moveToStarted(replica.routingEntry()));
|
||||
}
|
||||
|
||||
private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user