introduced notion of Role in DiscoveryNode

A node can now have roles, Role is an enum made of master, data, ingest. A ndoe with no roles is simplicitly a coordinating only node. Roles are resolved once at construction time based on node attributes and never serialized. Moving DiscoveryNode to Writeable helps cleaning up the code, making fields final allow to easily see where roles need to be initialized and do it in one single place.
This commit is contained in:
javanna 2016-03-04 11:19:11 +01:00 committed by Luca Cavanna
parent eb941d8005
commit 4224371d8b
8 changed files with 150 additions and 87 deletions

View File

@ -59,7 +59,12 @@ public final class LivenessResponse extends ActionResponse {
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
clusterName.writeTo(out); clusterName.writeTo(out);
out.writeOptionalStreamable(node); if (node == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
node.writeTo(out);
}
} }
public ClusterName getClusterName() { public ClusterName getClusterName() {

View File

@ -26,8 +26,9 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.transport.TransportAddressSerializers; import org.elasticsearch.common.transport.TransportAddressSerializers;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -36,19 +37,19 @@ import org.elasticsearch.node.Node;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.transport.TransportAddressSerializers.addressToStream; import static org.elasticsearch.common.transport.TransportAddressSerializers.addressToStream;
/** /**
* A discovery node represents a node that is part of the cluster. * A discovery node represents a node that is part of the cluster.
*/ */
public class DiscoveryNode implements Streamable, ToXContent { public class DiscoveryNode implements Writeable<DiscoveryNode>, ToXContent {
public static final String DATA_ATTR = "data"; private static final DiscoveryNode PROTOTYPE = new DiscoveryNode("prototype", DummyTransportAddress.INSTANCE, Version.CURRENT);
public static final String MASTER_ATTR = "master";
public static final String INGEST_ATTR = "ingest";
public static boolean localNode(Settings settings) { public static boolean localNode(Settings settings) {
if (Node.NODE_LOCAL_SETTING.exists(settings)) { if (Node.NODE_LOCAL_SETTING.exists(settings)) {
@ -85,22 +86,20 @@ public class DiscoveryNode implements Streamable, ToXContent {
public static final List<DiscoveryNode> EMPTY_LIST = Collections.emptyList(); public static final List<DiscoveryNode> EMPTY_LIST = Collections.emptyList();
private String nodeName = ""; private final String nodeName;
private String nodeId; private final String nodeId;
private String hostName; private final String hostName;
private String hostAddress; private final String hostAddress;
private TransportAddress address; private final TransportAddress address;
private ImmutableOpenMap<String, String> attributes; private final ImmutableOpenMap<String, String> attributes;
private Version version = Version.CURRENT; private final Version version;
private final Set<Role> roles;
DiscoveryNode() {
}
/** /**
* Creates a new {@link DiscoveryNode} * Creates a new {@link DiscoveryNode}
* <p> * <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current version. * <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used * version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered * the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated. * and updated.
* </p> * </p>
@ -116,8 +115,8 @@ public class DiscoveryNode implements Streamable, ToXContent {
/** /**
* Creates a new {@link DiscoveryNode} * Creates a new {@link DiscoveryNode}
* <p> * <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current version. * <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used * version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered * the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated. * and updated.
* </p> * </p>
@ -135,8 +134,8 @@ public class DiscoveryNode implements Streamable, ToXContent {
/** /**
* Creates a new {@link DiscoveryNode}. * Creates a new {@link DiscoveryNode}.
* <p> * <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current version. * <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used * version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered * the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated. * and updated.
* </p> * </p>
@ -149,27 +148,16 @@ public class DiscoveryNode implements Streamable, ToXContent {
* @param attributes node attributes * @param attributes node attributes
* @param version the version of the node. * @param version the version of the node.
*/ */
public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address, Map<String, String> attributes, Version version) { public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address,
if (nodeName != null) { Map<String, String> attributes, Version version) {
this.nodeName = nodeName.intern(); this(nodeName, nodeId, hostName, hostAddress, address, copyAttributes(attributes), version);
}
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.builder();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
builder.put(entry.getKey().intern(), entry.getValue().intern());
}
this.attributes = builder.build();
this.nodeId = nodeId.intern();
this.hostName = hostName.intern();
this.hostAddress = hostAddress.intern();
this.address = address;
this.version = version;
} }
/** /**
* Creates a new {@link DiscoveryNode}. * Creates a new {@link DiscoveryNode}.
* <p> * <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current version. * <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used * version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered * the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated. * and updated.
* </p> * </p>
@ -182,21 +170,59 @@ public class DiscoveryNode implements Streamable, ToXContent {
* @param attributes node attributes * @param attributes node attributes
* @param version the version of the node. * @param version the version of the node.
*/ */
public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address, ImmutableOpenMap<String, String> attributes, Version version) { public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address,
ImmutableOpenMap<String, String> attributes, Version version) {
this(nodeName, nodeId, hostName, hostAddress, address, copyAttributes(attributes), version);
}
private DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address,
ImmutableOpenMap.Builder<String, String> attributesBuilder, Version version) {
if (nodeName != null) { if (nodeName != null) {
this.nodeName = nodeName.intern(); this.nodeName = nodeName.intern();
} else {
this.nodeName = "";
} }
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<String, String> entry : attributes) {
builder.put(entry.key.intern(), entry.value.intern());
}
this.attributes = builder.build();
this.nodeId = nodeId.intern(); this.nodeId = nodeId.intern();
this.hostName = hostName.intern(); this.hostName = hostName.intern();
this.hostAddress = hostAddress.intern(); this.hostAddress = hostAddress.intern();
this.address = address; this.address = address;
if (version == null) {
this.version = Version.CURRENT;
} else {
this.version = version; this.version = version;
} }
this.attributes = attributesBuilder.build();
this.roles = resolveRoles(this.attributes);
}
private static ImmutableOpenMap.Builder<String, String> copyAttributes(ImmutableOpenMap<String, String> attributes) {
//we could really use copyOf and get rid of this method but we call String#intern while copying...
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<String, String> entry : attributes) {
builder.put(entry.key.intern(), entry.value.intern());
}
return builder;
}
private static ImmutableOpenMap.Builder<String, String> copyAttributes(Map<String, String> attributes) {
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.builder();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
builder.put(entry.getKey().intern(), entry.getValue().intern());
}
return builder;
}
private static Set<Role> resolveRoles(ImmutableOpenMap<String, String> attributes) {
Set<Role> roles = new HashSet<>();
for (Role role : Role.values()) {
String roleAttribute = attributes.get(role.getRoleName());
//all existing roles default to true
if (roleAttribute == null || Booleans.parseBooleanExact(roleAttribute)) {
roles.add(role);
}
}
return roles;
}
/** /**
* The address that the node can be communicated with. * The address that the node can be communicated with.
@ -258,8 +284,7 @@ public class DiscoveryNode implements Streamable, ToXContent {
* Should this node hold data (shards) or not. * Should this node hold data (shards) or not.
*/ */
public boolean dataNode() { public boolean dataNode() {
String data = attributes.get(DATA_ATTR); return roles.contains(Role.DATA);
return data == null ? true : Booleans.parseBooleanExact(data);
} }
/** /**
@ -273,8 +298,7 @@ public class DiscoveryNode implements Streamable, ToXContent {
* Can this node become master or not. * Can this node become master or not.
*/ */
public boolean masterNode() { public boolean masterNode() {
String master = attributes.get(MASTER_ATTR); return roles.contains(Role.MASTER);
return master == null ? true : Booleans.parseBooleanExact(master);
} }
/** /**
@ -288,8 +312,11 @@ public class DiscoveryNode implements Streamable, ToXContent {
* Returns a boolean that tells whether this an ingest node or not * Returns a boolean that tells whether this an ingest node or not
*/ */
public boolean isIngestNode() { public boolean isIngestNode() {
String ingest = attributes.get(INGEST_ATTR); return roles.contains(Role.INGEST);
return ingest == null ? true : Booleans.parseBooleanExact(ingest); }
public Set<Role> getRoles() {
return roles;
} }
public Version version() { public Version version() {
@ -309,25 +336,24 @@ public class DiscoveryNode implements Streamable, ToXContent {
} }
public static DiscoveryNode readNode(StreamInput in) throws IOException { public static DiscoveryNode readNode(StreamInput in) throws IOException {
DiscoveryNode node = new DiscoveryNode(); return PROTOTYPE.readFrom(in);
node.readFrom(in);
return node;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public DiscoveryNode readFrom(StreamInput in) throws IOException {
nodeName = in.readString().intern(); String nodeName = in.readString().intern();
nodeId = in.readString().intern(); String nodeId = in.readString().intern();
hostName = in.readString().intern(); String hostName = in.readString().intern();
hostAddress = in.readString().intern(); String hostAddress = in.readString().intern();
address = TransportAddressSerializers.addressFromStream(in); TransportAddress address = TransportAddressSerializers.addressFromStream(in);
int size = in.readVInt(); int size = in.readVInt();
ImmutableOpenMap.Builder<String, String> attributes = ImmutableOpenMap.builder(size); ImmutableOpenMap.Builder<String, String> attributesBuilder = ImmutableOpenMap.builder(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
attributes.put(in.readString().intern(), in.readString().intern()); attributesBuilder.put(in.readString().intern(), in.readString().intern());
} }
this.attributes = attributes.build(); ImmutableOpenMap<String, String> attributes = attributesBuilder.build();
version = Version.readVersion(in); Version version = Version.readVersion(in);
return new DiscoveryNode(nodeName, nodeId, hostName, hostAddress, address, attributes, version);
} }
@Override @Override
@ -396,4 +422,26 @@ public class DiscoveryNode implements Streamable, ToXContent {
builder.endObject(); builder.endObject();
return builder; return builder;
} }
public enum Role {
MASTER("master", "m"),
DATA("data", "d"),
INGEST("ingest", "i");
private final String roleName;
private final String abbreviation;
Role(String roleName, String abbreviation) {
this.roleName = roleName;
this.abbreviation = abbreviation;
}
public String getRoleName() {
return roleName;
}
public String getAbbreviation() {
return abbreviation;
}
}
} }

View File

@ -53,9 +53,9 @@ public class DiscoveryNodeService extends AbstractComponent {
+ ", " + Node.NODE_DATA_SETTING.getKey() + " and " + Node.NODE_INGEST_SETTING.getKey() + " explicitly instead"); + ", " + Node.NODE_DATA_SETTING.getKey() + " and " + Node.NODE_INGEST_SETTING.getKey() + " explicitly instead");
} }
//nocommit why don't we remove master as well if it's true? and ingest? //nocommit why don't we remove master as well if it's true? and ingest?
if (attributes.containsKey("data")) { if (attributes.containsKey(DiscoveryNode.Role.DATA.getRoleName())) {
if (attributes.get("data").equals("true")) { if (attributes.get(DiscoveryNode.Role.DATA.getRoleName()).equals("true")) {
attributes.remove("data"); attributes.remove(DiscoveryNode.Role.DATA.getRoleName());
} }
} }

View File

@ -376,18 +376,24 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
if (index != -1) { if (index != -1) {
String matchAttrName = nodeId.substring(0, index); String matchAttrName = nodeId.substring(0, index);
String matchAttrValue = nodeId.substring(index + 1); String matchAttrValue = nodeId.substring(index + 1);
if ("data".equals(matchAttrName)) { if (DiscoveryNode.Role.DATA.getRoleName().equals(matchAttrName)) {
if (Booleans.parseBoolean(matchAttrValue, true)) { if (Booleans.parseBoolean(matchAttrValue, true)) {
resolvedNodesIds.addAll(dataNodes.keys()); resolvedNodesIds.addAll(dataNodes.keys());
} else { } else {
resolvedNodesIds.removeAll(dataNodes.keys()); resolvedNodesIds.removeAll(dataNodes.keys());
} }
} else if ("master".equals(matchAttrName)) { } else if (DiscoveryNode.Role.MASTER.getRoleName().equals(matchAttrName)) {
if (Booleans.parseBoolean(matchAttrValue, true)) { if (Booleans.parseBoolean(matchAttrValue, true)) {
resolvedNodesIds.addAll(masterNodes.keys()); resolvedNodesIds.addAll(masterNodes.keys());
} else { } else {
resolvedNodesIds.removeAll(masterNodes.keys()); resolvedNodesIds.removeAll(masterNodes.keys());
} }
} else if (DiscoveryNode.Role.INGEST.getRoleName().equals(matchAttrName)) {
if (Booleans.parseBoolean(matchAttrValue, true)) {
resolvedNodesIds.addAll(ingestNodes.keys());
} else {
resolvedNodesIds.removeAll(ingestNodes.keys());
}
} else { } else {
for (DiscoveryNode node : this) { for (DiscoveryNode node : this) {
for (ObjectObjectCursor<String, String> entry : node.attributes()) { for (ObjectObjectCursor<String, String> entry : node.attributes()) {

View File

@ -61,7 +61,12 @@ public class ConnectTransportException extends ActionTransportException {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeOptionalStreamable(node); if (node == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
node.writeTo(out);
}
} }
public DiscoveryNode node() { public DiscoveryNode node() {

View File

@ -123,12 +123,8 @@ public class TransportNodesActionTests extends ESTestCase {
List<DiscoveryNode> discoveryNodes = new ArrayList<>(); List<DiscoveryNode> discoveryNodes = new ArrayList<>();
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
if (randomBoolean()) { for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) {
attributes.put("master", Boolean.toString(randomBoolean())); attributes.put(role.getRoleName(), Boolean.toString(randomBoolean()));
attributes.put("data", Boolean.toString(randomBoolean()));
attributes.put("ingest", Boolean.toString(randomBoolean()));
} else {
attributes.put("client", "true");
} }
if (frequently()) { if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5)); attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5));

View File

@ -313,8 +313,8 @@ public class ClusterChangedEventTests extends ESTestCase {
// Create a new DiscoveryNode // Create a new DiscoveryNode
private static DiscoveryNode newNode(final String nodeId, boolean isMasterEligible, boolean isData) { private static DiscoveryNode newNode(final String nodeId, boolean isMasterEligible, boolean isData) {
final Map<String, String> attributes = MapBuilder.<String, String>newMapBuilder() final Map<String, String> attributes = MapBuilder.<String, String>newMapBuilder()
.put(DiscoveryNode.MASTER_ATTR, isMasterEligible ? "true" : "false") .put(DiscoveryNode.Role.MASTER.getRoleName(), isMasterEligible ? "true" : "false")
.put(DiscoveryNode.DATA_ATTR, isData ? "true": "false") .put(DiscoveryNode.Role.DATA.getRoleName(), isData ? "true": "false")
.immutableMap(); .immutableMap();
return new DiscoveryNode(nodeId, nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); return new DiscoveryNode(nodeId, nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
} }

View File

@ -104,12 +104,8 @@ public class DiscoveryNodesTests extends ESTestCase {
List<DiscoveryNode> nodesList = new ArrayList<>(); List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
if (randomBoolean()) { for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) {
attributes.put("master", Boolean.toString(randomBoolean())); attributes.put(role.getRoleName(), Boolean.toString(randomBoolean()));
attributes.put("data", Boolean.toString(randomBoolean()));
attributes.put("ingest", Boolean.toString(randomBoolean()));
} else {
attributes.put("client", "true");
} }
if (frequently()) { if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5)); attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5));
@ -138,21 +134,28 @@ public class DiscoveryNodesTests extends ESTestCase {
Set<String> matchingNodeIds(DiscoveryNodes nodes) { Set<String> matchingNodeIds(DiscoveryNodes nodes) {
return Collections.singleton(nodes.masterNodeId()); return Collections.singleton(nodes.masterNodeId());
} }
}, MASTER_ELIGIBLE("master:true") { }, MASTER_ELIGIBLE(DiscoveryNode.Role.MASTER.getRoleName() + ":true") {
@Override @Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) { Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>(); Set<String> ids = new HashSet<>();
nodes.getMasterNodes().keysIt().forEachRemaining(ids::add); nodes.getMasterNodes().keysIt().forEachRemaining(ids::add);
return ids; return ids;
} }
}, DATA("data:true") { }, DATA(DiscoveryNode.Role.DATA.getRoleName() + ":true") {
@Override @Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) { Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>(); Set<String> ids = new HashSet<>();
nodes.getDataNodes().keysIt().forEachRemaining(ids::add); nodes.getDataNodes().keysIt().forEachRemaining(ids::add);
return ids; return ids;
} }
}, CUSTOM_ATTRIBUTE("attr:value") { }, INGEST(DiscoveryNode.Role.INGEST.getRoleName() + ":true") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>();
nodes.getIngestNodes().keysIt().forEachRemaining(ids::add);
return ids;
}
},CUSTOM_ATTRIBUTE("attr:value") {
@Override @Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) { Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>(); Set<String> ids = new HashSet<>();