Only use IndexShard instance to lookup recovery status
make sure we use the instance itself to look it up, and not the shard id, as we might get another instance leftover from #6825
This commit is contained in:
parent
f480969503
commit
43a5cbe9be
|
@ -157,7 +157,7 @@ public class TransportRecoveryAction extends
|
||||||
RecoveryStatus recoveryStatus = indexShard.recoveryStatus();
|
RecoveryStatus recoveryStatus = indexShard.recoveryStatus();
|
||||||
|
|
||||||
if (recoveryStatus == null) {
|
if (recoveryStatus == null) {
|
||||||
recoveryStatus = recoveryTarget.recoveryStatus(indexShard.shardId());
|
recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recoveryStatus != null) {
|
if (recoveryStatus != null) {
|
||||||
|
|
|
@ -544,7 +544,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
|
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
|
||||||
} else if (isPeerRecovery(shardRouting)) {
|
} else if (isPeerRecovery(shardRouting)) {
|
||||||
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
|
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
|
||||||
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard.shardId());
|
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
|
||||||
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
|
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
|
||||||
// we have an ongoing recovery, find the source based on current routing and compare them
|
// we have an ongoing recovery, find the source based on current routing and compare them
|
||||||
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
||||||
|
|
|
@ -109,16 +109,15 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public RecoveryStatus recoveryStatus(ShardId shardId) {
|
public RecoveryStatus recoveryStatus(IndexShard indexShard) {
|
||||||
RecoveryStatus peerRecoveryStatus = findRecoveryByShardId(shardId);
|
RecoveryStatus recoveryStatus = findRecoveryByShard(indexShard);
|
||||||
if (peerRecoveryStatus == null) {
|
if (recoveryStatus == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
// update how long it takes if we are still recovering...
|
if (recoveryStatus.recoveryState().getTimer().startTime() > 0 && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
|
||||||
if (peerRecoveryStatus.recoveryState().getTimer().startTime() > 0 && peerRecoveryStatus.stage() != RecoveryState.Stage.DONE) {
|
recoveryStatus.recoveryState().getTimer().time(System.currentTimeMillis() - recoveryStatus.recoveryState().getTimer().startTime());
|
||||||
peerRecoveryStatus.recoveryState().getTimer().time(System.currentTimeMillis() - peerRecoveryStatus.recoveryState().getTimer().startTime());
|
|
||||||
}
|
}
|
||||||
return peerRecoveryStatus;
|
return recoveryStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cancelRecovery(IndexShard indexShard) {
|
public void cancelRecovery(IndexShard indexShard) {
|
||||||
|
@ -179,7 +178,7 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void retryRecovery(final StartRecoveryRequest request, TimeValue retryAfter, final RecoveryStatus status, final RecoveryListener listener) {
|
public void retryRecovery(final StartRecoveryRequest request, TimeValue retryAfter, final RecoveryStatus status, final RecoveryListener listener) {
|
||||||
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC ,new Runnable() {
|
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
doRecovery(request, status, listener);
|
doRecovery(request, status, listener);
|
||||||
|
@ -319,16 +318,6 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure);
|
void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
|
||||||
private RecoveryStatus findRecoveryByShardId(ShardId shardId) {
|
|
||||||
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
|
|
||||||
if (recoveryStatus.shardId.equals(shardId)) {
|
|
||||||
return recoveryStatus;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private RecoveryStatus findRecoveryByShard(IndexShard indexShard) {
|
private RecoveryStatus findRecoveryByShard(IndexShard indexShard) {
|
||||||
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
|
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
|
||||||
|
|
Loading…
Reference in New Issue