Zen Discovery Cluster Events to have Priority.URGENT
Master node cluster state events resulting in zen discovery (node gets added, removed, for example) should be processed with priority URGENT as its always better to process them as fast as possible, and not let other events get in the way. closes #3361
This commit is contained in:
parent
6a25395c97
commit
f2614b22de
|
@ -311,7 +311,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
if (localNode.equals(masterNode)) {
|
if (localNode.equals(masterNode)) {
|
||||||
this.master = true;
|
this.master = true;
|
||||||
nodesFD.start(); // start the nodes FD
|
nodesFD.start(); // start the nodes FD
|
||||||
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
|
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
|
||||||
|
@ -370,7 +370,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (master) {
|
if (master) {
|
||||||
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.HIGH, new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.URGENT, new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
|
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
|
||||||
|
@ -401,7 +401,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
// nothing to do here...
|
// nothing to do here...
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.URGENT, new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
|
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
|
||||||
|
@ -434,7 +434,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
// nothing to do here...
|
// nothing to do here...
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
|
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
|
||||||
|
@ -465,7 +465,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
|
|
||||||
logger.info("master_left [{}], reason [{}]", masterNode, reason);
|
logger.info("master_left [{}], reason [{}]", masterNode, reason);
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
|
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
|
||||||
|
@ -516,7 +516,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
|
|
||||||
void handleNewClusterStateFromMaster(final ClusterState newState) {
|
void handleNewClusterStateFromMaster(final ClusterState newState) {
|
||||||
if (master) {
|
if (master) {
|
||||||
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
if (newState.version() > currentState.version()) {
|
if (newState.version() > currentState.version()) {
|
||||||
|
@ -542,7 +542,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
|
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
|
||||||
|
@ -611,7 +611,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
// node calling the join request
|
// node calling the join request
|
||||||
membership.sendValidateJoinRequestBlocking(node, state, pingTimeout);
|
membership.sendValidateJoinRequestBlocking(node, state, pingTimeout);
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
if (currentState.nodes().nodeExists(node.id())) {
|
if (currentState.nodes().nodeExists(node.id())) {
|
||||||
|
@ -820,7 +820,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
|
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
|
||||||
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue