LocalNodeMasterListener is a regular listener (#62485)

This commit makes the LocalNodeMasterListener interface extend the
ClusterStateListener interface and use a default implementation for
detecting whether the local node master status changed.

Backport of #62422
This commit is contained in:
Jay Modi 2020-09-16 11:42:53 -06:00 committed by GitHub
parent 6fac8478ef
commit 5da922064f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 45 deletions

View File

@ -22,7 +22,7 @@ package org.elasticsearch.cluster;
* Enables listening to master changes events of the local node (when the local node becomes the master, and when the local * Enables listening to master changes events of the local node (when the local node becomes the master, and when the local
* node cease being a master). * node cease being a master).
*/ */
public interface LocalNodeMasterListener { public interface LocalNodeMasterListener extends ClusterStateListener {
/** /**
* Called when local node is elected to be the master * Called when local node is elected to be the master
@ -33,5 +33,16 @@ public interface LocalNodeMasterListener {
* Called when the local node used to be the master, a new master was elected and it's no longer the local node. * Called when the local node used to be the master, a new master was elected and it's no longer the local node.
*/ */
void offMaster(); void offMaster();
@Override
default void clusterChanged(ClusterChangedEvent event) {
final boolean wasMaster = event.previousState().nodes().isLocalNodeElectedMaster();
final boolean isMaster = event.localNodeMaster();
if (wasMaster == false && isMaster) {
onMaster();
} else if (wasMaster && isMaster == false) {
offMaster();
}
}
} }

View File

@ -51,7 +51,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -92,8 +91,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>(); private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
private final LocalNodeMasterListeners localNodeMasterListeners;
private final AtomicReference<ClusterState> state; // last applied state private final AtomicReference<ClusterState> state; // last applied state
private final String nodeName; private final String nodeName;
@ -104,7 +101,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
this.clusterSettings = clusterSettings; this.clusterSettings = clusterSettings;
this.threadPool = threadPool; this.threadPool = threadPool;
this.state = new AtomicReference<>(); this.state = new AtomicReference<>();
this.localNodeMasterListeners = new LocalNodeMasterListeners();
this.nodeName = nodeName; this.nodeName = nodeName;
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
@ -134,7 +130,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
protected synchronized void doStart() { protected synchronized void doStart() {
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Objects.requireNonNull(state.get(), "please set initial state before starting"); Objects.requireNonNull(state.get(), "please set initial state before starting");
addListener(localNodeMasterListeners);
threadPoolExecutor = createThreadPoolExecutor(); threadPoolExecutor = createThreadPoolExecutor();
} }
@ -179,7 +174,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
} }
} }
ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS); ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
removeListener(localNodeMasterListeners);
} }
@Override @Override
@ -255,7 +249,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
* Add a listener for on/off local node master events * Add a listener for on/off local node master events
*/ */
public void addLocalNodeMasterListener(LocalNodeMasterListener listener) { public void addLocalNodeMasterListener(LocalNodeMasterListener listener) {
localNodeMasterListeners.add(listener); addListener(listener);
} }
/** /**
@ -606,43 +600,6 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
} }
} }
private static class LocalNodeMasterListeners implements ClusterStateListener {
private final List<LocalNodeMasterListener> listeners = new CopyOnWriteArrayList<>();
private volatile boolean master = false;
private LocalNodeMasterListeners() {
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (!master && event.localNodeMaster()) {
master = true;
for (LocalNodeMasterListener listener : listeners) {
try {
listener.onMaster();
} catch (Exception e) {
logger.warn("failed to notify LocalNodeMasterListener", e);
}
}
} else if (master && !event.localNodeMaster()) {
master = false;
for (LocalNodeMasterListener listener : listeners) {
try {
listener.offMaster();
} catch (Exception e) {
logger.warn("failed to notify LocalNodeMasterListener", e);
}
}
}
}
private void add(LocalNodeMasterListener listener) {
listeners.add(listener);
}
}
// this one is overridden in tests so we can control time // this one is overridden in tests so we can control time
protected long currentTimeInMillis() { protected long currentTimeInMillis() {
return threadPool.relativeTimeInMillis(); return threadPool.relativeTimeInMillis();