Nodes APIs: All node APIs to allow to match on nodes based on addresses, names, and attributes, closes #1125.

This commit is contained in:
kimchy 2011-07-16 03:27:35 +03:00
parent 2594828d48
commit 64054d4057
8 changed files with 91 additions and 36 deletions

View File

@ -19,9 +19,6 @@
package org.elasticsearch.action; package org.elasticsearch.action;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@ -34,31 +31,4 @@ public class Actions {
validationException.addValidationError(error); validationException.addValidationError(error);
return validationException; return validationException;
} }
public static boolean isAllNodes(String... nodesIds) {
return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"));
}
public static String[] buildNodesIds(DiscoveryNodes nodes, String... nodesIds) {
if (isAllNodes(nodesIds)) {
int index = 0;
nodesIds = new String[nodes.size()];
for (DiscoveryNode node : nodes) {
nodesIds[index++] = node.id();
}
return nodesIds;
} else {
String[] resolvedNodesIds = new String[nodesIds.length];
for (int i = 0; i < nodesIds.length; i++) {
if (nodesIds[i].equals("_local")) {
resolvedNodesIds[i] = nodes.localNodeId();
} else if (nodesIds[i].equals("_master")) {
resolvedNodesIds[i] = nodes.masterNodeId();
} else {
resolvedNodesIds[i] = nodesIds[i];
}
}
return resolvedNodesIds;
}
}
} }

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.cluster.node.shutdown;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.Actions;
import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
@ -105,7 +104,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
throw new ElasticSearchIllegalStateException("Shutdown is disabled"); throw new ElasticSearchIllegalStateException("Shutdown is disabled");
} }
Set<DiscoveryNode> nodes = Sets.newHashSet(); Set<DiscoveryNode> nodes = Sets.newHashSet();
if (Actions.isAllNodes(request.nodesIds)) { if (state.nodes().isAllNodes(request.nodesIds)) {
logger.info("[cluster_shutdown]: requested, shutting down in [{}]", request.delay); logger.info("[cluster_shutdown]: requested, shutting down in [{}]", request.delay);
nodes.addAll(state.nodes().dataNodes().values()); nodes.addAll(state.nodes().dataNodes().values());
nodes.addAll(state.nodes().masterNodes().values()); nodes.addAll(state.nodes().masterNodes().values());
@ -162,7 +161,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
}); });
t.start(); t.start();
} else { } else {
final String[] nodesIds = Actions.buildNodesIds(state.nodes(), request.nodesIds); final String[] nodesIds = state.nodes().resolveNodes(request.nodesIds);
logger.info("[partial_cluster_shutdown]: requested, shutting down [{}] in [{}]", nodesIds, request.delay); logger.info("[partial_cluster_shutdown]: requested, shutting down [{}] in [{}]", nodesIds, request.delay);
for (String nodeId : nodesIds) { for (String nodeId : nodesIds) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.support.nodes;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.Actions;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException; import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.action.support.BaseAction;
@ -33,7 +32,13 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -120,7 +125,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
clusterState = clusterService.state(); clusterState = clusterService.state();
String[] nodesIds = Actions.buildNodesIds(clusterState.nodes(), request.nodesIds()); String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds); this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length); this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length);
} }

View File

@ -26,9 +26,11 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -169,6 +171,59 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
return null; return null;
} }
public boolean isAllNodes(String... nodesIds) {
return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"));
}
public String[] resolveNodes(String... nodesIds) {
if (isAllNodes(nodesIds)) {
int index = 0;
nodesIds = new String[nodes.size()];
for (DiscoveryNode node : this) {
nodesIds[index++] = node.id();
}
return nodesIds;
} else {
Set<String> resolvedNodesIds = new HashSet<String>(nodesIds.length);
for (String nodeId : nodesIds) {
if (nodeId.equals("_local")) {
resolvedNodesIds.add(localNodeId());
} else if (nodeId.equals("_master")) {
resolvedNodesIds.add(masterNodeId());
} else if (nodeExists(nodeId)) {
resolvedNodesIds.add(nodeId);
} else {
// not a node id, try and search by name
for (DiscoveryNode node : this) {
if (Regex.simpleMatch(nodeId, node.name())) {
resolvedNodesIds.add(node.id());
}
}
for (DiscoveryNode node : this) {
if (node.address().match(nodeId)) {
resolvedNodesIds.add(node.id());
}
}
int index = nodeId.indexOf(':');
if (index != -1) {
String matchAttrName = nodeId.substring(0, index);
String matchAttrValue = nodeId.substring(index + 1);
for (DiscoveryNode node : this) {
for (Map.Entry<String, String> entry : node.attributes().entrySet()) {
String attrName = entry.getKey();
String attrValue = entry.getValue();
if (Regex.simpleMatch(matchAttrName, attrName) && Regex.simpleMatch(matchAttrValue, attrValue)) {
resolvedNodesIds.add(node.id());
}
}
}
}
}
}
return resolvedNodesIds.toArray(new String[resolvedNodesIds.size()]);
}
}
public DiscoveryNodes removeDeadMembers(Set<String> newNodes, String masterNodeId) { public DiscoveryNodes removeDeadMembers(Set<String> newNodes, String masterNodeId) {
Builder builder = new Builder().masterNodeId(masterNodeId).localNodeId(localNodeId); Builder builder = new Builder().masterNodeId(masterNodeId).localNodeId(localNodeId);
for (DiscoveryNode node : this) { for (DiscoveryNode node : this) {

View File

@ -38,6 +38,10 @@ public class DummyTransportAddress implements TransportAddress {
return 0; return 0;
} }
@Override public boolean match(String otherAddress) {
return false;
}
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.transport;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import java.io.IOException; import java.io.IOException;
import java.net.Inet6Address; import java.net.Inet6Address;
@ -62,6 +63,21 @@ public class InetSocketTransportAddress implements TransportAddress {
return 1; return 1;
} }
@Override public boolean match(String otherAddress) {
if (address.getHostName() != null && Regex.simpleMatch(otherAddress, address.getHostName())) {
return true;
}
if (address.getAddress() != null) {
if (address.getAddress().getHostName() != null && Regex.simpleMatch(otherAddress, address.getAddress().getHostName())) {
return true;
}
if (address.getAddress().getHostAddress() != null && Regex.simpleMatch(otherAddress, address.getAddress().getHostAddress())) {
return true;
}
}
return false;
}
public InetSocketAddress address() { public InetSocketAddress address() {
return this.address; return this.address;
} }

View File

@ -46,6 +46,10 @@ public class LocalTransportAddress implements TransportAddress {
return 2; return 2;
} }
@Override public boolean match(String otherAddress) {
return id.equals(otherAddress);
}
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
id = in.readUTF(); id = in.readUTF();
} }

View File

@ -29,4 +29,6 @@ import java.io.Serializable;
public interface TransportAddress extends Streamable, Serializable { public interface TransportAddress extends Streamable, Serializable {
short uniqueAddressTypeId(); short uniqueAddressTypeId();
boolean match(String otherAddress);
} }