From f1c503176675a3b0a937c3595d2c4dd846adf054 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 14 Jan 2020 14:08:31 +0100 Subject: [PATCH] Fix queuing in AsyncLucenePersistedState (#50958) The logic in AsyncLucenePersistedState was flawed, unexpectedly queuing up two update tasks in parallel. --- .../gateway/GatewayMetaState.java | 89 ++++++++++--------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 69243b075bf..8ac1b73ae5d 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -46,7 +46,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.DiscoveryModule; @@ -356,7 +355,9 @@ public class GatewayMetaState implements Closeable { } else { logger.trace("queuing term update (setting term to {})", currentTerm); newCurrentTermQueued = true; - scheduleUpdate(); + if (newStateQueued == false) { + scheduleUpdate(); + } } } } @@ -370,55 +371,57 @@ public class GatewayMetaState implements Closeable { } else { logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version()); newStateQueued = true; - scheduleUpdate(); + if (newCurrentTermQueued == false) { + scheduleUpdate(); + } } } } private void scheduleUpdate() { assert Thread.holdsLock(mutex); - try { - threadPoolExecutor.execute(new AbstractRunnable() { + assert threadPoolExecutor.getQueue().isEmpty() : "threadPoolExecutor queue not empty"; + threadPoolExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred when storing new meta data", e); - } - - @Override - protected void doRun() { - final Long term; - final ClusterState clusterState; - synchronized (mutex) { - if (newCurrentTermQueued) { - term = getCurrentTerm(); - newCurrentTermQueued = false; - } else { - term = null; - } - if (newStateQueued) { - clusterState = getLastAcceptedState(); - newStateQueued = false; - } else { - clusterState = null; - } - } - // write current term before last accepted state so that it is never below term in last accepted state - if (term != null) { - persistedState.setCurrentTerm(term); - } - if (clusterState != null) { - persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState)); - } - } - }); - } catch (EsRejectedExecutionException e) { - // ignore cases where we are shutting down..., there is really nothing interesting to be done here... - if (threadPoolExecutor.isShutdown() == false) { - assert false : "only expect rejections when shutting down"; - throw e; + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred when storing new meta data", e); } - } + + @Override + public void onRejection(Exception e) { + assert threadPoolExecutor.isShutdown() : "only expect rejections when shutting down"; + } + + @Override + protected void doRun() { + final Long term; + final ClusterState clusterState; + synchronized (mutex) { + if (newCurrentTermQueued) { + term = getCurrentTerm(); + logger.trace("resetting newCurrentTermQueued"); + newCurrentTermQueued = false; + } else { + term = null; + } + if (newStateQueued) { + clusterState = getLastAcceptedState(); + logger.trace("resetting newStateQueued"); + newStateQueued = false; + } else { + clusterState = null; + } + } + // write current term before last accepted state so that it is never below term in last accepted state + if (term != null) { + persistedState.setCurrentTerm(term); + } + if (clusterState != null) { + persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState)); + } + } + }); } static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =