From 770bac252ac54b89f0a9d5242cb837040277b1c0 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 19 Sep 2010 23:49:15 +0200 Subject: [PATCH] fix local discovery to handle master nodes specifically better --- .../TransportMasterNodeOperationAction.java | 2 +- .../discovery/local/LocalDiscovery.java | 96 +++++++++++-------- .../aliases/IndexAliasesTests.java | 22 ++--- 3 files changed, 69 insertions(+), 51 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index cac95e2bc91..59fde617cc4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -157,7 +157,7 @@ public abstract class TransportMasterNodeOperationAction implem this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings)); clusterGroup.members().add(this); - if (clusterGroup.members().size() == 1) { + + LocalDiscovery firstMaster = null; + for (LocalDiscovery localDiscovery : clusterGroup.members()) { + if (localDiscovery.localNode().masterNode()) { + firstMaster = localDiscovery; + break; + } + } + + if (firstMaster != null && firstMaster.equals(this)) { // we are the first master (and the master) master = true; + final LocalDiscovery master = firstMaster; clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() - .localNodeId(localNode.id()) - .masterNodeId(localNode.id()) - // put our local node - .put(localNode); - return newClusterStateBuilder().state(currentState).nodes(builder).build(); + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder(); + for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { + nodesBuilder.put(discovery.localNode); + } + nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); + return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { sendInitialStateEventIfNeeded(); } }); - } else { - // we are not the master, tell the master to send it - LocalDiscovery master = clusterGroup.members().peek(); - + } else if (firstMaster != null) { // update as fast as we can the local node state with the new metadata (so we create indices for example) - final ClusterState masterState = master.clusterService.state(); + final ClusterState masterState = firstMaster.clusterService.state(); clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { // make sure we have the local node id set, we might need it as a result of the new metadata @@ -118,14 +125,15 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem }); // tell the master to send the fact that we are here - master.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() { + final LocalDiscovery master = firstMaster; + firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - if (currentState.nodes().nodeExists(localNode.id())) { - // no change, the node already exists in the cluster - logger.warn("received an address [{}] for an existing node [{}]", localNode.address(), localNode); - return currentState; + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder(); + for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { + nodesBuilder.put(discovery.localNode); } - return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(localNode)).build(); + nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); + return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { @@ -133,7 +141,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } }); } - } + } // else, no master node, the next node that will start will fill things in... } @Override protected void doStop() throws ElasticSearchException { @@ -150,27 +158,37 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem return; } - final LocalDiscovery masterDiscovery = clusterGroup.members().peek(); - // if the removed node is the master, make the next one as the master - if (master) { - masterDiscovery.master = true; - } - - final Set newMembers = newHashSet(); - for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode.id()); - } - - masterDiscovery.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, masterDiscovery.localNode.id()); - DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); - if (delta.added()) { - logger.warn("No new nodes should be created when a new discovery view is accepted"); - } - return newClusterStateBuilder().state(currentState).nodes(newNodes).build(); + LocalDiscovery firstMaster = null; + for (LocalDiscovery localDiscovery : clusterGroup.members()) { + if (localDiscovery.localNode().masterNode()) { + firstMaster = localDiscovery; + break; } - }); + } + + if (firstMaster != null) { + // if the removed node is the master, make the next one as the master + if (master) { + firstMaster.master = true; + } + + final Set newMembers = newHashSet(); + for (LocalDiscovery discovery : clusterGroup.members()) { + newMembers.add(discovery.localNode.id()); + } + + final LocalDiscovery master = firstMaster; + master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id()); + DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); + if (delta.added()) { + logger.warn("No new nodes should be created when a new discovery view is accepted"); + } + return newClusterStateBuilder().state(currentState).nodes(newNodes).build(); + } + }); + } } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java index c3d9a1d8496..a44e461263a 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java @@ -65,45 +65,45 @@ public class IndexAliasesTests extends AbstractNodesTests { @Test public void testAliases() throws Exception { - logger.info("Creating index [test]"); + logger.info("--> creating index [test]"); client1.admin().indices().create(createIndexRequest("test")).actionGet(); - logger.info("Running Cluster Health"); + logger.info("--> running cluster_health"); ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); - logger.info("Done Cluster Health, status " + clusterHealth.status()); + logger.info("--> done cluster_health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); try { - logger.info("Indexing against [alias1], should fail"); + logger.info("--> indexing against [alias1], should fail"); client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet(); assert false : "index [alias1] should not exists"; } catch (IndexMissingException e) { assertThat(e.index().name(), equalTo("alias1")); } - logger.info("Aliasing index [test] with [alias1]"); + logger.info("--> aliasing index [test] with [alias1]"); client1.admin().indices().prepareAliases().addAlias("test", "alias1").execute().actionGet(); Thread.sleep(300); - logger.info("Indexing against [alias1], should work now"); + logger.info("--> indexing against [alias1], should work now"); IndexResponse indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet(); assertThat(indexResponse.index(), equalTo("test")); - logger.info("Creating index [test]"); + logger.info("--> creating index [test]"); client1.admin().indices().create(createIndexRequest("test_x")).actionGet(); - logger.info("Running Cluster Health"); + logger.info("--> running cluster_health"); clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); - logger.info("Done Cluster Health, status " + clusterHealth.status()); + logger.info("--> done cluster_health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); - logger.info("Remove [alias1], Aliasing index [test_x] with [alias1]"); + logger.info("--> remove [alias1], Aliasing index [test_x] with [alias1]"); client1.admin().indices().aliases(indexAliasesRequest().removeAlias("test", "alias1").addAlias("test_x", "alias1")).actionGet(); Thread.sleep(300); - logger.info("Indexing against [alias1], should work against [test_x]"); + logger.info("--> indexing against [alias1], should work against [test_x]"); indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet(); assertThat(indexResponse.index(), equalTo("test_x")); }