local discovery should update the local metadata state once it started

This commit is contained in:
kimchy 2010-07-23 01:39:37 +03:00
parent 5706e5f6b9
commit 9283e2a7ad
2 changed files with 15 additions and 2 deletions

View File

@ -106,11 +106,23 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
} else { } else {
// we are not the master, tell the master to send it // we are not the master, tell the master to send it
LocalDiscovery master = clusterGroup.members().peek(); LocalDiscovery master = clusterGroup.members().peek();
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = master.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder().putAll(currentState.nodes()).put(localNode).localNodeId(localNode.id());
return ClusterState.builder().state(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build();
}
});
// tell the master to send the fact that we are here
master.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() { master.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(localNode.id())) { if (currentState.nodes().nodeExists(localNode.id())) {
// no change, the node already exists in the cluster // no change, the node already exists in the cluster
logger.warn("Received an address [{}] for an existing node [{}]", localNode.address(), localNode); logger.warn("received an address [{}] for an existing node [{}]", localNode.address(), localNode);
return currentState; return currentState;
} }
return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(localNode)).build(); return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(localNode)).build();

View File

@ -86,8 +86,9 @@ public class SimpleRecoveryTests extends AbstractNodesTests {
// now start another one so we move some primaries // now start another one so we move some primaries
startNode("server3"); startNode("server3");
Thread.sleep(200);
logger.info("Running Cluster Health"); logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("3")).actionGet(); clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("3")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status()); logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));