fix local discovery to handle master nodes specifically better

This commit is contained in:
kimchy 2010-09-19 23:49:15 +02:00
parent 25246902cc
commit 770bac252a
3 changed files with 69 additions and 51 deletions

View File

@ -157,7 +157,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
@Override public void onTimeout(TimeValue timeout) { @Override public void onTimeout(TimeValue timeout) {
clusterService.remove(this); clusterService.remove(this);
listener.onFailure(exp); listener.onFailure(new MasterNotDiscoveredException());
} }
@Override public void clusterChanged(ClusterChangedEvent event) { @Override public void clusterChanged(ClusterChangedEvent event) {

View File

@ -86,29 +86,36 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings)); this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings));
clusterGroup.members().add(this); clusterGroup.members().add(this);
if (clusterGroup.members().size() == 1) {
LocalDiscovery firstMaster = null;
for (LocalDiscovery localDiscovery : clusterGroup.members()) {
if (localDiscovery.localNode().masterNode()) {
firstMaster = localDiscovery;
break;
}
}
if (firstMaster != null && firstMaster.equals(this)) {
// we are the first master (and the master) // we are the first master (and the master)
master = true; master = true;
final LocalDiscovery master = firstMaster;
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder();
.localNodeId(localNode.id()) for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
.masterNodeId(localNode.id()) nodesBuilder.put(discovery.localNode);
// put our local node }
.put(localNode); nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return newClusterStateBuilder().state(currentState).nodes(builder).build(); return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).build();
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded(); sendInitialStateEventIfNeeded();
} }
}); });
} else { } else if (firstMaster != null) {
// we are not the master, tell the master to send it
LocalDiscovery master = clusterGroup.members().peek();
// update as fast as we can the local node state with the new metadata (so we create indices for example) // update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = master.clusterService.state(); final ClusterState masterState = firstMaster.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata // make sure we have the local node id set, we might need it as a result of the new metadata
@ -118,14 +125,15 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}); });
// tell the master to send the fact that we are here // tell the master to send the fact that we are here
master.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() { final LocalDiscovery master = firstMaster;
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(localNode.id())) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder();
// no change, the node already exists in the cluster for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
logger.warn("received an address [{}] for an existing node [{}]", localNode.address(), localNode); nodesBuilder.put(discovery.localNode);
return currentState;
} }
return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(localNode)).build(); nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).build();
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {
@ -133,7 +141,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
} }
}); });
} }
} } // else, no master node, the next node that will start will fill things in...
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
@ -150,27 +158,37 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
return; return;
} }
final LocalDiscovery masterDiscovery = clusterGroup.members().peek(); LocalDiscovery firstMaster = null;
// if the removed node is the master, make the next one as the master for (LocalDiscovery localDiscovery : clusterGroup.members()) {
if (master) { if (localDiscovery.localNode().masterNode()) {
masterDiscovery.master = true; firstMaster = localDiscovery;
} break;
final Set<String> newMembers = newHashSet();
for (LocalDiscovery discovery : clusterGroup.members()) {
newMembers.add(discovery.localNode.id());
}
masterDiscovery.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, masterDiscovery.localNode.id());
DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
if (delta.added()) {
logger.warn("No new nodes should be created when a new discovery view is accepted");
}
return newClusterStateBuilder().state(currentState).nodes(newNodes).build();
} }
}); }
if (firstMaster != null) {
// if the removed node is the master, make the next one as the master
if (master) {
firstMaster.master = true;
}
final Set<String> newMembers = newHashSet();
for (LocalDiscovery discovery : clusterGroup.members()) {
newMembers.add(discovery.localNode.id());
}
final LocalDiscovery master = firstMaster;
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
if (delta.added()) {
logger.warn("No new nodes should be created when a new discovery view is accepted");
}
return newClusterStateBuilder().state(currentState).nodes(newNodes).build();
}
});
}
} }
} }

View File

@ -65,45 +65,45 @@ public class IndexAliasesTests extends AbstractNodesTests {
@Test public void testAliases() throws Exception { @Test public void testAliases() throws Exception {
logger.info("Creating index [test]"); logger.info("--> creating index [test]");
client1.admin().indices().create(createIndexRequest("test")).actionGet(); client1.admin().indices().create(createIndexRequest("test")).actionGet();
logger.info("Running Cluster Health"); logger.info("--> running cluster_health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status()); logger.info("--> done cluster_health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
try { try {
logger.info("Indexing against [alias1], should fail"); logger.info("--> indexing against [alias1], should fail");
client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet(); client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet();
assert false : "index [alias1] should not exists"; assert false : "index [alias1] should not exists";
} catch (IndexMissingException e) { } catch (IndexMissingException e) {
assertThat(e.index().name(), equalTo("alias1")); assertThat(e.index().name(), equalTo("alias1"));
} }
logger.info("Aliasing index [test] with [alias1]"); logger.info("--> aliasing index [test] with [alias1]");
client1.admin().indices().prepareAliases().addAlias("test", "alias1").execute().actionGet(); client1.admin().indices().prepareAliases().addAlias("test", "alias1").execute().actionGet();
Thread.sleep(300); Thread.sleep(300);
logger.info("Indexing against [alias1], should work now"); logger.info("--> indexing against [alias1], should work now");
IndexResponse indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet(); IndexResponse indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.index(), equalTo("test")); assertThat(indexResponse.index(), equalTo("test"));
logger.info("Creating index [test]"); logger.info("--> creating index [test]");
client1.admin().indices().create(createIndexRequest("test_x")).actionGet(); client1.admin().indices().create(createIndexRequest("test_x")).actionGet();
logger.info("Running Cluster Health"); logger.info("--> running cluster_health");
clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status()); logger.info("--> done cluster_health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
logger.info("Remove [alias1], Aliasing index [test_x] with [alias1]"); logger.info("--> remove [alias1], Aliasing index [test_x] with [alias1]");
client1.admin().indices().aliases(indexAliasesRequest().removeAlias("test", "alias1").addAlias("test_x", "alias1")).actionGet(); client1.admin().indices().aliases(indexAliasesRequest().removeAlias("test", "alias1").addAlias("test_x", "alias1")).actionGet();
Thread.sleep(300); Thread.sleep(300);
logger.info("Indexing against [alias1], should work against [test_x]"); logger.info("--> indexing against [alias1], should work against [test_x]");
indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet(); indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.index(), equalTo("test_x")); assertThat(indexResponse.index(), equalTo("test_x"));
} }