diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 0b3aef49fb1..7f4bfb676c5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -20,21 +20,45 @@ package org.elasticsearch.cluster.node; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.lucene.util.StringHelper; import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamOutput; import org.elasticsearch.util.io.stream.Streamable; +import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.transport.TransportAddress; import org.elasticsearch.util.transport.TransportAddressSerializers; import java.io.IOException; import java.io.Serializable; +import java.util.Map; + +import static org.elasticsearch.util.transport.TransportAddressSerializers.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class DiscoveryNode implements Streamable, Serializable { + public static Map buildCommonNodesAttributes(Settings settings) { + Map attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap()); + if (attributes.containsKey("client")) { + if (attributes.get("client").equals("false")) { + attributes.remove("client"); // this is the default + } else { + // if we are client node, don't store data ... + attributes.put("data", "false"); + } + } + if (attributes.containsKey("data")) { + if (attributes.get("data").equals("true")) { + attributes.remove("data"); + } + } + return attributes; + } + public static final ImmutableList EMPTY_LIST = ImmutableList.of(); private String nodeName = StringHelper.intern(""); @@ -43,22 +67,26 @@ public class DiscoveryNode implements Streamable, Serializable { private TransportAddress address; - private boolean dataNode = true; + private ImmutableMap attributes; private DiscoveryNode() { } public DiscoveryNode(String nodeId, TransportAddress address) { - this("", true, nodeId, address); + this("", nodeId, address, ImmutableMap.of()); } - public DiscoveryNode(String nodeName, boolean dataNode, String nodeId, TransportAddress address) { + public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map attributes) { if (nodeName == null) { this.nodeName = StringHelper.intern(""); } else { this.nodeName = StringHelper.intern(nodeName); } - this.dataNode = dataNode; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Map.Entry entry : attributes.entrySet()) { + builder.put(StringHelper.intern(entry.getKey()), StringHelper.intern(entry.getValue())); + } + this.attributes = builder.build(); this.nodeId = StringHelper.intern(nodeId); this.address = address; } @@ -105,11 +133,26 @@ public class DiscoveryNode implements Streamable, Serializable { return name(); } + /** + * The node attributes. + */ + public ImmutableMap attributes() { + return this.attributes; + } + + /** + * The node attributes. + */ + public ImmutableMap getAttributes() { + return attributes(); + } + /** * Should this node hold data (shards) or not. */ public boolean dataNode() { - return dataNode; + String data = attributes.get("data"); + return data == null || data.equals("true"); } /** @@ -119,6 +162,18 @@ public class DiscoveryNode implements Streamable, Serializable { return dataNode(); } + /** + * Is the node a client node or not. + */ + public boolean clientNode() { + String client = attributes.get("client"); + return client != null && client.equals("true"); + } + + public boolean isClientNode() { + return clientNode(); + } + public static DiscoveryNode readNode(StreamInput in) throws IOException { DiscoveryNode node = new DiscoveryNode(); node.readFrom(in); @@ -127,16 +182,25 @@ public class DiscoveryNode implements Streamable, Serializable { @Override public void readFrom(StreamInput in) throws IOException { nodeName = StringHelper.intern(in.readUTF()); - dataNode = in.readBoolean(); nodeId = StringHelper.intern(in.readUTF()); address = TransportAddressSerializers.addressFromStream(in); + int size = in.readVInt(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < size; i++) { + builder.put(StringHelper.intern(in.readUTF()), StringHelper.intern(in.readUTF())); + } + attributes = builder.build(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeUTF(nodeName); - out.writeBoolean(dataNode); out.writeUTF(nodeId); - TransportAddressSerializers.addressToStream(out, address); + addressToStream(out, address); + out.writeVInt(attributes.size()); + for (Map.Entry entry : attributes.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } } @Override public boolean equals(Object obj) { @@ -159,12 +223,12 @@ public class DiscoveryNode implements Streamable, Serializable { if (nodeId != null) { sb.append('[').append(nodeId).append(']'); } - if (dataNode) { - sb.append("[data]"); - } if (address != null) { sb.append('[').append(address).append(']'); } + if (!attributes.isEmpty()) { + sb.append(attributes); + } return sb.toString(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java index c8b18942a58..13d5f458c6d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.collect.Maps.*; import static com.google.common.collect.Sets.*; import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.node.DiscoveryNode.*; /** * @author kimchy (Shay Banon) @@ -142,11 +143,11 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl channel.connect(clusterName.value()); channel.setReceiver(this); logger.debug("Connected to cluster [{}], address [{}]", channel.getClusterName(), channel.getAddress()); - this.localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), channel.getAddress().toString(), transportService.boundAddress().publishAddress()); + this.localNode = new DiscoveryNode(settings.get("name"), channel.getAddress().toString(), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings)); if (isMaster()) { firstMaster = true; - clusterService.submitStateUpdateTask("jgroups-disco-initialconnect(master)", new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("jgroups-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() .localNodeId(localNode.id()) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 3da929a2a63..aebc1d8cc8e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLong; import static com.google.common.collect.Sets.*; import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.node.DiscoveryNode.*; /** * @author kimchy (Shay Banon) @@ -84,14 +85,14 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem clusterGroups.put(clusterName, clusterGroup); } logger.debug("Connected to cluster [{}]", clusterName); - this.localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress()); + this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings)); clusterGroup.members().add(this); if (clusterGroup.members().size() == 1) { // we are the first master (and the master) master = true; firstMaster = true; - clusterService.submitStateUpdateTask("local-disco-initialconnect(master)", new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() .localNodeId(localNode.id()) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 68a60a333a7..c89613444d9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -42,11 +42,13 @@ import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.settings.Settings; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.collect.Lists.*; import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.util.TimeValue.*; @@ -55,6 +57,8 @@ import static org.elasticsearch.util.TimeValue.*; */ public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, DiscoveryNodesProvider { + private final ThreadPool threadPool; + private final TransportService transportService; private final ClusterService clusterService; @@ -94,6 +98,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen ZenPingService pingService) { super(settings); this.clusterName = clusterName; + this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; this.pingService = pingService; @@ -114,57 +119,29 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } @Override protected void doStart() throws ElasticSearchException { - localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress()); + Map nodeAttributes = buildCommonNodesAttributes(settings); + Boolean zenMaster = componentSettings.getAsBoolean("master", null); + if (zenMaster != null) { + if (zenMaster.equals(Boolean.FALSE)) { + nodeAttributes.put("zen.master", "false"); + } + } else if (nodeAttributes.containsKey("client")) { + if (nodeAttributes.get("client").equals("true")) { + nodeAttributes.put("zen.master", "false"); + } + } + localNode = new DiscoveryNode(settings.get("name"), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress(), nodeAttributes); pingService.start(); - boolean retry = true; - while (retry) { - retry = false; - DiscoveryNode masterNode = broadBingTillMasterResolved(); - if (localNode.equals(masterNode)) { - // we are the master (first) - this.firstMaster = true; - this.master = true; - nodesFD.start(); // start the nodes FD - clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() - .localNodeId(localNode.id()) - .masterNodeId(localNode.id()) - // put our local node - .put(localNode); - // update the fact that we are the master... - latestDiscoNodes = builder.build(); - return newClusterStateBuilder().state(currentState).nodes(builder).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - sendInitialStateEventIfNeeded(); - } - }); - } else { - this.firstMaster = false; - this.master = false; - try { - // first, make sure we can connect to the master - transportService.connectToNode(masterNode); - } catch (Exception e) { - logger.warn("Failed to connect to master [{}], retrying...", e, masterNode); - retry = true; - continue; + if (nodeAttributes.containsKey("zen.master") && nodeAttributes.get("zen.master").equals("false")) { + // do the join on a different thread + threadPool.execute(new Runnable() { + @Override public void run() { + initialJoin(); } - // send join request - try { - membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout); - } catch (Exception e) { - logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode); - // failed to send the join request, retry - retry = true; - continue; - } - // cool, we found a master, start an FD on it - masterFD.start(masterNode); - } + }); + } else { + initialJoin(); } } @@ -239,6 +216,63 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen publishClusterState.publish(clusterState); } + private void initialJoin() { + boolean retry = true; + while (retry) { + retry = false; + DiscoveryNode masterNode = broadPingTillMasterResolved(); + if (localNode.equals(masterNode)) { + // we are the master (first) + this.firstMaster = true; + this.master = true; + nodesFD.start(); // start the nodes FD + clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() + .localNodeId(localNode.id()) + .masterNodeId(localNode.id()) + // put our local node + .put(localNode); + // update the fact that we are the master... + latestDiscoNodes = builder.build(); + return newClusterStateBuilder().state(currentState).nodes(builder).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + sendInitialStateEventIfNeeded(); + } + }); + } else { + this.firstMaster = false; + this.master = false; + try { + // first, make sure we can connect to the master + transportService.connectToNode(masterNode); + } catch (Exception e) { + logger.warn("Failed to connect to master [{}], retrying...", e, masterNode); + retry = true; + continue; + } + // send join request + try { + membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout); + } catch (Exception e) { + logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode); + // failed to send the join request, retry + retry = true; + continue; + } + // cool, we found a master, start an FD on it + masterFD.start(masterNode); + } + if (retry) { + if (!lifecycle.started()) { + return; + } + } + } + } + private void handleNodeFailure(final DiscoveryNode node) { if (!master) { // nothing to do here... @@ -365,7 +399,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } - private DiscoveryNode broadBingTillMasterResolved() { + private DiscoveryNode broadPingTillMasterResolved() { while (true) { ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout); List pingMasters = newArrayList(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java index aa18dce53e2..d8067301e9a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java @@ -26,6 +26,7 @@ import org.elasticsearch.util.settings.Settings; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import static com.google.common.collect.Lists.*; @@ -45,7 +46,7 @@ public class ElectMasterService extends AbstractComponent { * Returns a list of the next possible masters. */ public DiscoveryNode[] nextPossibleMasters(Iterable nodes, int numberOfPossibleMasters) { - List sortedNodes = sortedNodes(nodes); + List sortedNodes = sortedMasterNodes(nodes); if (sortedNodes == null) { return new DiscoveryNode[0]; } @@ -65,18 +66,27 @@ public class ElectMasterService extends AbstractComponent { * if no master has been elected. */ public DiscoveryNode electMaster(Iterable nodes) { - List sortedNodes = sortedNodes(nodes); - if (sortedNodes == null) { + List sortedNodes = sortedMasterNodes(nodes); + if (sortedNodes == null || sortedNodes.isEmpty()) { return null; } return sortedNodes.get(0); } - private List sortedNodes(Iterable nodes) { + private List sortedMasterNodes(Iterable nodes) { List possibleNodes = Lists.newArrayList(nodes); if (possibleNodes.isEmpty()) { return null; } + // clean non master nodes + for (Iterator it = possibleNodes.iterator(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.attributes().containsKey("zen.master")) { + if (node.attributes().get("zen.master").equals("false")) { + it.remove(); + } + } + } Collections.sort(possibleNodes, nodeComparator); return possibleNodes; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java index 7319703f73e..9af8c720e6c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java @@ -64,7 +64,12 @@ public class RestNodesInfoAction extends BaseRestHandler { builder.field("name", nodeInfo.node().name()); builder.field("transport_address", nodeInfo.node().address().toString()); - builder.field("data_node", nodeInfo.node().dataNode()); + + builder.startArray("attributes"); + for (Map.Entry attr : nodeInfo.node().attributes().entrySet()) { + builder.field(attr.getKey(), attr.getValue()); + } + builder.endArray(); for (Map.Entry nodeAttribute : nodeInfo.attributes().entrySet()) { builder.field(nodeAttribute.getKey(), nodeAttribute.getValue()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java index 2a7679d9ef0..773ec2518e2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java @@ -83,15 +83,19 @@ public class ImmutableSettings implements Settings { throw new SettingsException("Component [" + type + "] does not start with prefix [" + prefix + "]"); } String settingPrefix = type.substring(prefix.length() + 1); // 1 for the '.' - settingPrefix = settingPrefix.substring(0, settingPrefix.length() - component.getSimpleName().length() - 1); // remove the simple class name + settingPrefix = settingPrefix.substring(0, settingPrefix.length() - component.getSimpleName().length()); // remove the simple class name (keep the dot) + return getByPrefix(settingPrefix); + } + + @Override public Settings getByPrefix(String prefix) { Builder builder = new Builder(); for (Map.Entry entry : getAsMap().entrySet()) { - if (entry.getKey().startsWith(settingPrefix)) { - if (entry.getKey().length() <= settingPrefix.length()) { + if (entry.getKey().startsWith(prefix)) { + if (entry.getKey().length() < prefix.length()) { // ignore this one continue; } - builder.put(entry.getKey().substring(settingPrefix.length() + 1), entry.getValue()); + builder.put(entry.getKey().substring(prefix.length()), entry.getValue()); } } builder.globalSettings(this); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java index 50de0624c5c..8f39c2066a1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java @@ -55,7 +55,12 @@ public interface Settings { Settings getComponentSettings(String prefix, Class component); /** - * The class loader associted with this settings. + * A settings that are filtered (and key is removed) with the specified prefix. + */ + Settings getByPrefix(String prefix); + + /** + * The class loader associated with this settings. */ ClassLoader getClassLoader();