improve moving from post_recovery to started
we need to move to started from post recovery on cluster level changes, we need to make sure we handle a global state change of relocating, which can happen (and not pass through started)
This commit is contained in:
parent
6856cfc5e3
commit
5f2b7dc266
|
@ -483,16 +483,16 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
void retry(boolean fromClusterEvent, @Nullable final Throwable failure) {
|
||||
if (!fromClusterEvent) {
|
||||
// make it threaded operation so we fork on the discovery listener thread
|
||||
|
||||
request.beforeLocalFork();
|
||||
request.operationThreaded(true);
|
||||
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
|
||||
@Override
|
||||
public void postAdded() {
|
||||
logger.trace("listener to cluster state added, trying to index again");
|
||||
// check if state version changed while we were adding this listener
|
||||
if (clusterState.version() != clusterService.state().version()) {
|
||||
logger.trace("state change while we were trying to add listener, trying to index again");
|
||||
long sampledVersion = clusterState.version();
|
||||
long currentVersion = clusterService.state().version();
|
||||
if (sampledVersion != currentVersion) {
|
||||
logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion);
|
||||
if (start(true)) {
|
||||
// if we managed to start and perform the operation on the primary, we can remove this listener
|
||||
clusterService.remove(this);
|
||||
|
@ -508,7 +508,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
logger.trace("cluster changed (version {}), trying to index again", event.state().version());
|
||||
logger.trace("cluster changed (version {}), trying to start again", event.state().version());
|
||||
if (start(true)) {
|
||||
// if we managed to start and perform the operation on the primary, we can remove this listener
|
||||
clusterService.remove(this);
|
||||
|
@ -548,7 +548,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
if (retryPrimaryException(e)) {
|
||||
primaryOperationStarted.set(false);
|
||||
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
|
||||
retry(false, null);
|
||||
retry(false, e);
|
||||
return;
|
||||
}
|
||||
if (e instanceof ElasticSearchException && ((ElasticSearchException) e).status() == RestStatus.CONFLICT) {
|
||||
|
|
|
@ -273,24 +273,37 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
}
|
||||
}
|
||||
|
||||
// make sure we refresh on state change due to cluster state changes
|
||||
if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) {
|
||||
try {
|
||||
engine.refresh(new Engine.Refresh("cluster_state_started").force(true));
|
||||
} catch (Throwable t) {
|
||||
logger.debug("failed to refresh due to move to cluster wide started", t);
|
||||
}
|
||||
synchronized (mutex) {
|
||||
if (state != IndexShardState.POST_RECOVERY) {
|
||||
logger.debug("suspected wrong state when acting on cluster state started state, current state {}", state);
|
||||
if (state == IndexShardState.POST_RECOVERY) {
|
||||
// if the state is started or relocating (cause it might move right away from started to relocating)
|
||||
// then move to STARTED
|
||||
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
|
||||
// we want to refresh *before* we move to internal STARTED state
|
||||
try {
|
||||
engine.refresh(new Engine.Refresh("cluster_state_started").force(true));
|
||||
} catch (Throwable t) {
|
||||
logger.debug("failed to refresh due to move to cluster wide started", t);
|
||||
}
|
||||
|
||||
boolean movedToStarted = false;
|
||||
synchronized (mutex) {
|
||||
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
|
||||
if (state == IndexShardState.POST_RECOVERY) {
|
||||
logger.debug("state: [{}]->[{}], reason [global state is [{}]]", state, IndexShardState.STARTED, newRouting.state());
|
||||
state = IndexShardState.STARTED;
|
||||
movedToStarted = true;
|
||||
} else {
|
||||
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
|
||||
}
|
||||
}
|
||||
if (movedToStarted) {
|
||||
indicesLifecycle.afterIndexShardStarted(this);
|
||||
}
|
||||
logger.debug("state: [{}]->[{}], reason [global state moved to started]", state, IndexShardState.STARTED);
|
||||
state = IndexShardState.STARTED;
|
||||
}
|
||||
indicesLifecycle.afterIndexShardStarted(this);
|
||||
}
|
||||
|
||||
this.shardRouting = newRouting;
|
||||
indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue