Add local node to cluster state

Today, the tribe node needs the local node so it adds it when it starts, but other APIs would benefit from adding the local node, also, adding the local node should be done in a cleaner manner, where it belongs, which is right after the discovery service starts in the cluster service
closes #6811
This commit is contained in:
Shay Banon 2014-07-10 13:40:05 +02:00
parent eed3513c37
commit 9ca5e6e3e1
6 changed files with 81 additions and 68 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -130,6 +131,24 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
discoveryService.addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
submitStateUpdateTask("update local node", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).put(localNode()).localNodeId(localNode().id()))
.build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failed ot update local node", t);
}
});
}
});
}
@Override

View File

@ -22,17 +22,29 @@ package org.elasticsearch.common.component;
/**
*
*/
public interface LifecycleListener {
public abstract class LifecycleListener {
void beforeStart();
public void beforeStart() {
void afterStart();
}
void beforeStop();
public void afterStart() {
void afterStop();
}
void beforeClose();
public void beforeStop() {
void afterClose();
}
public void afterStop() {
}
public void beforeClose() {
}
public void afterClose() {
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
@ -37,11 +38,28 @@ import java.util.concurrent.TimeUnit;
*/
public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryService> {
private static class InitialStateListener implements InitialStateDiscoveryListener {
private final CountDownLatch latch = new CountDownLatch(1);
private volatile boolean initialStateReceived;
@Override
public void initialStateProcessed() {
initialStateReceived = true;
latch.countDown();
}
public boolean waitForInitialState(TimeValue timeValue) throws InterruptedException {
if (timeValue.millis() > 0) {
latch.await(timeValue.millis(), TimeUnit.MILLISECONDS);
}
return initialStateReceived;
}
}
private final TimeValue initialStateTimeout;
private final Discovery discovery;
private volatile boolean initialStateReceived;
private InitialStateListener initialStateListener;
@Inject
public DiscoveryService(Settings settings, Discovery discovery) {
@ -52,38 +70,28 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
@Override
protected void doStart() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(1);
InitialStateDiscoveryListener listener = new InitialStateDiscoveryListener() {
@Override
public void initialStateProcessed() {
latch.countDown();
}
};
discovery.addListener(listener);
try {
discovery.start();
if (initialStateTimeout.millis() > 0) {
try {
logger.trace("waiting for {} for the initial state to be set by the discovery", initialStateTimeout);
if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.trace("initial state set from discovery");
initialStateReceived = true;
} else {
initialStateReceived = false;
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}
} catch (InterruptedException e) {
// ignore
}
}
} finally {
discovery.removeListener(listener);
}
initialStateListener = new InitialStateListener();
discovery.addListener(initialStateListener);
discovery.start();
logger.info(discovery.nodeDescription());
}
public void waitForInitialState() {
try {
if (!initialStateListener.waitForInitialState(initialStateTimeout)) {
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
@Override
protected void doStop() throws ElasticsearchException {
if (initialStateListener != null) {
discovery.removeListener(initialStateListener);
}
discovery.stop();
}
@ -101,7 +109,7 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
* on {@link #doStart()}.
*/
public boolean initialStateReceived() {
return initialStateReceived;
return initialStateListener.initialStateReceived;
}
public String nodeDescription() {

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
@ -245,6 +246,7 @@ public final class InternalNode implements Node {
injector.getInstance(RestController.class).start();
injector.getInstance(TransportService.class).start();
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
discoService.waitForInitialState();
// gateway should start after disco, so it can try and recovery from gateway on "start"
injector.getInstance(GatewayService.class).start();

View File

@ -167,36 +167,6 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
@Override
protected void doStart() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// add our local node to the mix...
return ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id()))
.build();
}
@Override
public void onFailure(String source, Throwable t) {
try {
logger.error("{}", t, source);
} finally {
latch.countDown();
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchIllegalStateException("Interrupted while starting [" + this.getClass().getSimpleName() + "]", e);
}
for (InternalNode node : nodes) {
try {
node.start();

View File

@ -61,6 +61,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
logger.info("--> should be blocked, no master...");
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state
logger.info("--> start second node, cluster should be formed");
internalCluster().startNode(settings);
@ -102,6 +103,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
});
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state
logger.info("--> starting the previous master node again...");
internalCluster().startNode(settings);