Fix queuing in AsyncLucenePersistedState (#50958)
The logic in AsyncLucenePersistedState was flawed, unexpectedly queuing up two update tasks in parallel.
This commit is contained in:
parent
91d7b446a0
commit
f1c5031766
|
@ -46,7 +46,6 @@ import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
||||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.discovery.DiscoveryModule;
|
import org.elasticsearch.discovery.DiscoveryModule;
|
||||||
|
@ -356,7 +355,9 @@ public class GatewayMetaState implements Closeable {
|
||||||
} else {
|
} else {
|
||||||
logger.trace("queuing term update (setting term to {})", currentTerm);
|
logger.trace("queuing term update (setting term to {})", currentTerm);
|
||||||
newCurrentTermQueued = true;
|
newCurrentTermQueued = true;
|
||||||
scheduleUpdate();
|
if (newStateQueued == false) {
|
||||||
|
scheduleUpdate();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -370,55 +371,57 @@ public class GatewayMetaState implements Closeable {
|
||||||
} else {
|
} else {
|
||||||
logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version());
|
logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version());
|
||||||
newStateQueued = true;
|
newStateQueued = true;
|
||||||
scheduleUpdate();
|
if (newCurrentTermQueued == false) {
|
||||||
|
scheduleUpdate();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleUpdate() {
|
private void scheduleUpdate() {
|
||||||
assert Thread.holdsLock(mutex);
|
assert Thread.holdsLock(mutex);
|
||||||
try {
|
assert threadPoolExecutor.getQueue().isEmpty() : "threadPoolExecutor queue not empty";
|
||||||
threadPoolExecutor.execute(new AbstractRunnable() {
|
threadPoolExecutor.execute(new AbstractRunnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
logger.error("Exception occurred when storing new meta data", e);
|
logger.error("Exception occurred when storing new meta data", e);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doRun() {
|
|
||||||
final Long term;
|
|
||||||
final ClusterState clusterState;
|
|
||||||
synchronized (mutex) {
|
|
||||||
if (newCurrentTermQueued) {
|
|
||||||
term = getCurrentTerm();
|
|
||||||
newCurrentTermQueued = false;
|
|
||||||
} else {
|
|
||||||
term = null;
|
|
||||||
}
|
|
||||||
if (newStateQueued) {
|
|
||||||
clusterState = getLastAcceptedState();
|
|
||||||
newStateQueued = false;
|
|
||||||
} else {
|
|
||||||
clusterState = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// write current term before last accepted state so that it is never below term in last accepted state
|
|
||||||
if (term != null) {
|
|
||||||
persistedState.setCurrentTerm(term);
|
|
||||||
}
|
|
||||||
if (clusterState != null) {
|
|
||||||
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (EsRejectedExecutionException e) {
|
|
||||||
// ignore cases where we are shutting down..., there is really nothing interesting to be done here...
|
|
||||||
if (threadPoolExecutor.isShutdown() == false) {
|
|
||||||
assert false : "only expect rejections when shutting down";
|
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Override
|
||||||
|
public void onRejection(Exception e) {
|
||||||
|
assert threadPoolExecutor.isShutdown() : "only expect rejections when shutting down";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRun() {
|
||||||
|
final Long term;
|
||||||
|
final ClusterState clusterState;
|
||||||
|
synchronized (mutex) {
|
||||||
|
if (newCurrentTermQueued) {
|
||||||
|
term = getCurrentTerm();
|
||||||
|
logger.trace("resetting newCurrentTermQueued");
|
||||||
|
newCurrentTermQueued = false;
|
||||||
|
} else {
|
||||||
|
term = null;
|
||||||
|
}
|
||||||
|
if (newStateQueued) {
|
||||||
|
clusterState = getLastAcceptedState();
|
||||||
|
logger.trace("resetting newStateQueued");
|
||||||
|
newStateQueued = false;
|
||||||
|
} else {
|
||||||
|
clusterState = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// write current term before last accepted state so that it is never below term in last accepted state
|
||||||
|
if (term != null) {
|
||||||
|
persistedState.setCurrentTerm(term);
|
||||||
|
}
|
||||||
|
if (clusterState != null) {
|
||||||
|
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =
|
static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =
|
||||||
|
|
Loading…
Reference in New Issue