diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index 04eb14669e7..a8960387e6f 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -206,10 +206,19 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste if (enforceRecoverAfterTime && recoverAfterTime != null) { if (scheduledRecovery.compareAndSet(false, true)) { logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason); - threadPool.schedule(() -> { - if (recovered.compareAndSet(false, true)) { - logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); - recoveryRunnable.run(); + threadPool.schedule(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("delayed state recovery failed", e); + resetRecoveredFlags(); + } + + @Override + protected void doRun() { + if (recovered.compareAndSet(false, true)) { + logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); + recoveryRunnable.run(); + } } }, recoverAfterTime, ThreadPool.Names.GENERIC); } @@ -218,10 +227,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste threadPool.generic().execute(new AbstractRunnable() { @Override public void onFailure(final Exception e) { - logger.warn("Recovery failed", e); - // we reset `recovered` in the listener don't reset it here otherwise there might be a race - // that resets it to false while a new recover is already running? - GatewayService.this.onFailure("state recovery failed: " + e.getMessage()); + logger.warn("state recovery failed", e); + resetRecoveredFlags(); } @Override @@ -233,11 +240,9 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste } } - private void onFailure(final String message) { + private void resetRecoveredFlags() { recovered.set(false); scheduledRecovery.set(false); - // don't remove the block here, we don't want to allow anything in such a case - logger.info("metadata state not restored, reason: {}", message); } class RecoverStateUpdateTask extends ClusterStateUpdateTask { @@ -257,10 +262,16 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size()); } + @Override + public void onNoLongerMaster(String source) { + logger.debug("stepped down as master before recovering state [{}]", source); + resetRecoveredFlags(); + } + @Override public void onFailure(final String source, final Exception e) { logger.info(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - GatewayService.this.onFailure("failed to update cluster state"); + resetRecoveredFlags(); } } @@ -280,7 +291,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste @Override public void onFailure(final String msg) { - GatewayService.this.onFailure(msg); + logger.info("state recovery failed: {}", msg); + resetRecoveredFlags(); } }