From 8af925a0b15b9a1d348170417ac9e787dfff8c86 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 17 Jun 2010 01:05:23 +0300 Subject: [PATCH] add internal flags to help simulate forcefull disconnections --- .../discovery/zen/ZenDiscovery.java | 34 +++++++++++-------- .../zen/fd/MasterFaultDetection.java | 9 ++++- .../discovery/zen/fd/NodesFaultDetection.java | 8 ++++- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 1331376e0e9..1ad702a68d2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -78,6 +78,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final TimeValue initialPingTimeout; + // a flag that should be used only for testing + private final boolean sendLeaveRequest; + private final ElectMasterService electMaster; @@ -102,6 +105,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.pingService = pingService; this.initialPingTimeout = componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)); + this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true); logger.debug("Using initial_ping_timeout [{}]", initialPingTimeout); @@ -154,22 +158,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } nodesFD.stop(); initialStateSent.set(false); - if (!master) { - try { - membership.sendLeaveRequestBlocking(latestDiscoNodes.masterNode(), localNode, TimeValue.timeValueSeconds(1)); - } catch (Exception e) { - logger.debug("Failed to send leave request to master [{}]", e, latestDiscoNodes.masterNode()); - } - } else { - DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 5); - for (DiscoveryNode possibleMaster : possibleMasters) { - if (localNode.equals(possibleMaster)) { - continue; - } + if (sendLeaveRequest) { + if (!master) { try { - membership.sendLeaveRequest(latestDiscoNodes.masterNode(), possibleMaster); + membership.sendLeaveRequestBlocking(latestDiscoNodes.masterNode(), localNode, TimeValue.timeValueSeconds(1)); } catch (Exception e) { - logger.debug("Failed to send leave request from master [{}] to possible master [{}]", e, latestDiscoNodes.masterNode(), possibleMaster); + logger.debug("Failed to send leave request to master [{}]", e, latestDiscoNodes.masterNode()); + } + } else { + DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 5); + for (DiscoveryNode possibleMaster : possibleMasters) { + if (localNode.equals(possibleMaster)) { + continue; + } + try { + membership.sendLeaveRequest(latestDiscoNodes.masterNode(), possibleMaster); + } catch (Exception e) { + logger.debug("Failed to send leave request from master [{}] to possible master [{}]", e, latestDiscoNodes.masterNode(), possibleMaster); + } } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index f15627f94b2..ec71709ed5f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -69,6 +69,10 @@ public class MasterFaultDetection extends AbstractComponent { private final int pingRetryCount; + // used mainly for testing, should always be true + private final boolean registerConnectionListener; + + private final FDConnectionListener connectionListener; private volatile MasterPinger masterPinger; @@ -91,11 +95,14 @@ public class MasterFaultDetection extends AbstractComponent { this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30)); this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3); + this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true); logger.debug("Master FD uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); this.connectionListener = new FDConnectionListener(); - transportService.addConnectionListener(connectionListener); + if (registerConnectionListener) { + transportService.addConnectionListener(connectionListener); + } transportService.registerHandler(MasterPingRequestHandler.ACTION, new MasterPingRequestHandler()); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 510fce494bd..8ff91255253 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -64,6 +64,9 @@ public class NodesFaultDetection extends AbstractComponent { private final int pingRetryCount; + // used mainly for testing, should always be true + private final boolean registerConnectionListener; + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); @@ -84,13 +87,16 @@ public class NodesFaultDetection extends AbstractComponent { this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30)); this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3); + this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true); logger.debug("Nodes FD uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler()); this.connectionListener = new FDConnectionListener(); - transportService.addConnectionListener(connectionListener); + if (registerConnectionListener) { + transportService.addConnectionListener(connectionListener); + } } public void addListener(Listener listener) {