diff --git a/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java b/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java index 364deb98e74..7b123b66107 100644 --- a/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java +++ b/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java @@ -22,7 +22,7 @@ package org.elasticsearch.cluster; * Enables listening to master changes events of the local node (when the local node becomes the master, and when the local * node cease being a master). */ -public interface LocalNodeMasterListener { +public interface LocalNodeMasterListener extends ClusterStateListener { /** * Called when local node is elected to be the master @@ -33,5 +33,16 @@ public interface LocalNodeMasterListener { * Called when the local node used to be the master, a new master was elected and it's no longer the local node. */ void offMaster(); + + @Override + default void clusterChanged(ClusterChangedEvent event) { + final boolean wasMaster = event.previousState().nodes().isLocalNodeElectedMaster(); + final boolean isMaster = event.localNodeMaster(); + if (wasMaster == false && isMaster) { + onMaster(); + } else if (wasMaster && isMaster == false) { + offMaster(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 35d9ef567ae..87bed057b2a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -51,7 +51,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -92,8 +91,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); private final Map timeoutClusterStateListeners = new ConcurrentHashMap<>(); - private final LocalNodeMasterListeners localNodeMasterListeners; - private final AtomicReference state; // last applied state private final String nodeName; @@ -104,7 +101,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements this.clusterSettings = clusterSettings; this.threadPool = threadPool; this.state = new AtomicReference<>(); - this.localNodeMasterListeners = new LocalNodeMasterListeners(); this.nodeName = nodeName; this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); @@ -134,7 +130,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements protected synchronized void doStart() { Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); Objects.requireNonNull(state.get(), "please set initial state before starting"); - addListener(localNodeMasterListeners); threadPoolExecutor = createThreadPoolExecutor(); } @@ -179,7 +174,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } } ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS); - removeListener(localNodeMasterListeners); } @Override @@ -255,7 +249,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements * Add a listener for on/off local node master events */ public void addLocalNodeMasterListener(LocalNodeMasterListener listener) { - localNodeMasterListeners.add(listener); + addListener(listener); } /** @@ -606,43 +600,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } } - private static class LocalNodeMasterListeners implements ClusterStateListener { - - private final List listeners = new CopyOnWriteArrayList<>(); - private volatile boolean master = false; - - private LocalNodeMasterListeners() { - } - - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (!master && event.localNodeMaster()) { - master = true; - for (LocalNodeMasterListener listener : listeners) { - try { - listener.onMaster(); - } catch (Exception e) { - logger.warn("failed to notify LocalNodeMasterListener", e); - } - } - } else if (master && !event.localNodeMaster()) { - master = false; - for (LocalNodeMasterListener listener : listeners) { - try { - listener.offMaster(); - } catch (Exception e) { - logger.warn("failed to notify LocalNodeMasterListener", e); - } - } - } - } - - private void add(LocalNodeMasterListener listener) { - listeners.add(listener); - } - - } - // this one is overridden in tests so we can control time protected long currentTimeInMillis() { return threadPool.relativeTimeInMillis();