only send intiial state recived after sending join request, just when we actually get a new cluster state that includes us. Also, handle no nodes to send to with generic transport nodes action.

This commit is contained in:
kimchy 2010-07-10 15:43:49 +03:00
parent a0ead02299
commit d0cdbeffba
4 changed files with 36 additions and 7 deletions

View File

@ -19,15 +19,22 @@
package org.elasticsearch.action; package org.elasticsearch.action;
import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class PrimaryNotStartedActionException extends IndexShardException { public class PrimaryNotStartedActionException extends ElasticSearchException {
public PrimaryNotStartedActionException(ShardId shardId, String message) { public PrimaryNotStartedActionException(ShardId shardId, String message) {
super(shardId, message); super(buildMessage(shardId, message));
}
private static String buildMessage(ShardId shardId, String message) {
if (shardId == null) {
return message;
}
return "[" + shardId.index() + "][" + shardId.id() + "]" + message;
} }
} }

View File

@ -88,7 +88,6 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
return nodesIds; return nodesIds;
} }
private class AsyncAction { private class AsyncAction {
private final Request request; private final Request request;
@ -117,11 +116,27 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
nodesIds[index++] = node.id(); nodesIds[index++] = node.id();
} }
} }
for (int i = 0; i < nodesIds.length; i++) {
if (nodesIds[i].equals("_local")) {
nodesIds[i] = clusterState.nodes().localNodeId();
} else if (nodesIds[i].equals("_master")) {
nodesIds[i] = clusterState.nodes().masterNodeId();
}
}
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds); this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<Object>(nodesIds.length); this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length);
} }
private void start() { private void start() {
if (nodesIds.length == 0) {
// nothing to notify
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponse(request, responses));
}
});
return;
}
for (final String nodeId : nodesIds) { for (final String nodeId : nodesIds) {
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId); final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) { if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {

View File

@ -137,8 +137,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
updateTasksExecutor.execute(new Runnable() { updateTasksExecutor.execute(new Runnable() {
@Override public void run() { @Override public void run() {
if (!lifecycle.started()) { if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return; return;
} }
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState; ClusterState previousClusterState = clusterState;
try { try {
clusterState = updateTask.execute(previousClusterState); clusterState = updateTask.execute(previousClusterState);
@ -215,6 +217,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (updateTask instanceof ProcessedClusterStateUpdateTask) { if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState); ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState);
} }
logger.debug("processing [{}]: done applying updated cluster_state", source);
} else {
logger.debug("processing [{}]: no change in cluster_state", source);
} }
} }
}); });

View File

@ -291,7 +291,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded(); // don't send initial state event, since we want to get the cluster state from the master that includes us first
// sendInitialStateEventIfNeeded();
} }
}); });
} }