From 8a63e58e1a677b49297641b0ea512764adb8a409 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 26 Sep 2011 21:01:23 +0300 Subject: [PATCH] Add `cloud.node.auto_attributes` setting, when set to `true`, will automatically add aws ec2 related attributes to the node (like availability zone), closes #1364. --- .../elasticsearch/cluster/ClusterModule.java | 2 + .../cluster/node/DiscoveryNode.java | 20 ----- .../cluster/node/DiscoveryNodeService.java | 85 +++++++++++++++++++ .../discovery/local/LocalDiscovery.java | 11 ++- .../discovery/zen/ZenDiscovery.java | 9 +- .../cloud/aws/AwsEc2Service.java | 7 +- .../cloud/aws/network/Ec2NameResolver.java | 5 +- .../aws/node/Ec2CustomNodeAttributes.java | 76 +++++++++++++++++ .../discovery/ec2/Ec2Discovery.java | 6 +- 9 files changed, 189 insertions(+), 32 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java create mode 100644 plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/node/Ec2CustomNodeAttributes.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 7fa95f9187c..30c924f0b86 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.metadata.MetaDataService; import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationModule; import org.elasticsearch.cluster.routing.operation.OperationRoutingModule; @@ -61,6 +62,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules { @Override protected void configure() { + bind(DiscoveryNodeService.class).asEagerSingleton(); bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton(); bind(MetaDataService.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 2c97b3da582..f741a3f11b5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -21,7 +21,6 @@ package org.elasticsearch.cluster.node; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -46,25 +45,6 @@ public class DiscoveryNode implements Streamable, Serializable { return !(settings.getAsBoolean("node.client", false) || (!settings.getAsBoolean("node.data", true) && !settings.getAsBoolean("node.master", true))); } - public static Map buildCommonNodesAttributes(Settings settings) { - Map attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap()); - attributes.remove("name"); // name is extracted in other places - if (attributes.containsKey("client")) { - if (attributes.get("client").equals("false")) { - attributes.remove("client"); // this is the default - } else { - // if we are client node, don't store data ... - attributes.put("data", "false"); - } - } - if (attributes.containsKey("data")) { - if (attributes.get("data").equals("true")) { - attributes.remove("data"); - } - } - return attributes; - } - public static final ImmutableList EMPTY_LIST = ImmutableList.of(); private String nodeName = "".intern(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java new file mode 100644 index 00000000000..630b9e65482 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.node; + +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + */ +public class DiscoveryNodeService extends AbstractComponent { + + private final List customAttributesProviders = new CopyOnWriteArrayList(); + + @Inject public DiscoveryNodeService(Settings settings) { + super(settings); + } + + public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) { + customAttributesProviders.add(customAttributesProvider); + return this; + } + + public Map buildAttributes() { + Map attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap()); + attributes.remove("name"); // name is extracted in other places + if (attributes.containsKey("client")) { + if (attributes.get("client").equals("false")) { + attributes.remove("client"); // this is the default + } else { + // if we are client node, don't store data ... + attributes.put("data", "false"); + } + } + if (attributes.containsKey("data")) { + if (attributes.get("data").equals("true")) { + attributes.remove("data"); + } + } + + for (CustomAttributesProvider provider : customAttributesProviders) { + try { + Map customAttributes = provider.buildAttributes(); + if (customAttributes != null) { + for (Map.Entry entry : customAttributes.entrySet()) { + if (!attributes.containsKey(entry.getKey())) { + attributes.put(entry.getKey(), entry.getValue()); + } + } + } + } catch (Exception e) { + logger.warn("failed to build custom attributes from provider [{}]", e, provider); + } + } + + return attributes; + } + + public static interface CustomAttributesProvider { + + Map buildAttributes(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 1b89d03e9fb..a98cd7e929d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -46,7 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.common.collect.Sets.*; /** @@ -58,6 +58,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private final ClusterService clusterService; + private final DiscoveryNodeService discoveryNodeService; + private final ClusterName clusterName; private DiscoveryNode localNode; @@ -73,11 +75,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private static final AtomicLong nodeIdGenerator = new AtomicLong(); - @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService) { + @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, + DiscoveryNodeService discoveryNodeService) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; this.transportService = transportService; + this.discoveryNodeService = discoveryNodeService; } @Override protected void doStart() throws ElasticSearchException { @@ -88,7 +92,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem clusterGroups.put(clusterName, clusterGroup); } logger.debug("Connected to cluster [{}]", clusterName); - this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings)); + this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), + discoveryNodeService.buildAttributes()); clusterGroup.members().add(this); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index cafcee72d9a..2d26b8f0c35 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.UUID; @@ -59,7 +60,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.unit.TimeValue.*; @@ -77,6 +77,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final ClusterName clusterName; + private final DiscoveryNodeService discoveryNodeService; + private final ZenPingService pingService; private final MasterFaultDetection masterFD; @@ -110,12 +112,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, - ZenPingService pingService) { + DiscoveryNodeService discoveryNodeService, ZenPingService pingService) { super(settings); this.clusterName = clusterName; this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; + this.discoveryNodeService = discoveryNodeService; this.pingService = pingService; // also support direct discovery.zen settings, for cases when it gets extended @@ -138,7 +141,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } @Override protected void doStart() throws ElasticSearchException { - Map nodeAttributes = buildCommonNodesAttributes(settings); + Map nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling String nodeId = UUID.randomBase64UUID(); localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes); diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java index dac3fad3866..a66261bb87a 100644 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java @@ -27,6 +27,8 @@ import com.amazonaws.services.ec2.AmazonEC2Client; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cloud.aws.network.Ec2NameResolver; +import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkService; @@ -38,13 +40,16 @@ import org.elasticsearch.common.settings.SettingsFilter; */ public class AwsEc2Service extends AbstractLifecycleComponent { + public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/"; + private AmazonEC2Client client; - @Inject public AwsEc2Service(Settings settings, SettingsFilter settingsFilter, NetworkService networkService) { + @Inject public AwsEc2Service(Settings settings, SettingsFilter settingsFilter, NetworkService networkService, DiscoveryNodeService discoveryNodeService) { super(settings); settingsFilter.addFilter(new AwsSettingsFilter()); // add specific ec2 name resolver networkService.addCustomNameResolver(new Ec2NameResolver(settings)); + discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings)); } public synchronized AmazonEC2 client() { diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/network/Ec2NameResolver.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/network/Ec2NameResolver.java index b9785394935..6534fc247e3 100755 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/network/Ec2NameResolver.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/network/Ec2NameResolver.java @@ -20,6 +20,7 @@ package org.elasticsearch.cloud.aws.network; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.Closeables; import org.elasticsearch.common.network.NetworkService.CustomNameResolver; @@ -78,8 +79,6 @@ public class Ec2NameResolver extends AbstractComponent implements CustomNameReso } } - private static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/"; - /** * Construct a {@link CustomNameResolver}. */ @@ -97,7 +96,7 @@ public class Ec2NameResolver extends AbstractComponent implements CustomNameReso URLConnection urlConnection = null; InputStream in = null; try { - URL url = new URL(EC2_METADATA_URL + type.ec2Name); + URL url = new URL(AwsEc2Service.EC2_METADATA_URL + type.ec2Name); logger.debug("obtaining ec2 hostname from ec2 meta-data url {}", url); urlConnection = url.openConnection(); urlConnection.setConnectTimeout(2000); diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/node/Ec2CustomNodeAttributes.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/node/Ec2CustomNodeAttributes.java new file mode 100644 index 00000000000..dd942b1a9af --- /dev/null +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/node/Ec2CustomNodeAttributes.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws.node; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cloud.aws.AwsEc2Service; +import org.elasticsearch.cluster.node.DiscoveryNodeService; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.Closeables; +import org.elasticsearch.common.settings.Settings; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; +import java.util.Map; + +/** + */ +public class Ec2CustomNodeAttributes extends AbstractComponent implements DiscoveryNodeService.CustomAttributesProvider { + + public Ec2CustomNodeAttributes(Settings settings) { + super(settings); + } + + @Override public Map buildAttributes() { + if (!settings.getAsBoolean("cloud.node.auto_attributes", false)) { + return null; + } + Map ec2Attributes = Maps.newHashMap(); + + URLConnection urlConnection; + InputStream in = null; + try { + URL url = new URL(AwsEc2Service.EC2_METADATA_URL + "placement/availability-zone"); + logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url); + urlConnection = url.openConnection(); + urlConnection.setConnectTimeout(2000); + in = urlConnection.getInputStream(); + BufferedReader urlReader = new BufferedReader(new InputStreamReader(in)); + + String metadataResult = urlReader.readLine(); + if (metadataResult == null || metadataResult.length() == 0) { + logger.error("no ec2 metadata returned from {}", url); + return null; + } + ec2Attributes.put("aws_availability_zone", metadataResult); + } catch (IOException e) { + logger.debug("failed to get metadata for [placement/availability-zone]: " + ExceptionsHelper.detailedMessage(e)); + } finally { + Closeables.closeQuietly(in); + } + + return ec2Attributes; + } +} diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java index 26144d10c07..7005860717f 100755 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.ec2; import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -39,8 +40,9 @@ import org.elasticsearch.transport.TransportService; public class Ec2Discovery extends ZenDiscovery { @Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService, AwsEc2Service ec2Service) { - super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, pingService); + ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService, + DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service) { + super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, discoveryNodeService, pingService); if (settings.getAsBoolean("cloud.enabled", true)) { ImmutableList zenPings = pingService.zenPings(); UnicastZenPing unicastZenPing = null;