mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Add cloud.node.auto_attributes
setting, when set to true
, will automatically add aws ec2 related attributes to the node (like availability zone), closes #1364.
This commit is contained in:
parent
1d6fa98c2f
commit
8a63e58e1a
@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationModule;
|
||||
import org.elasticsearch.cluster.routing.operation.OperationRoutingModule;
|
||||
@ -61,6 +62,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(DiscoveryNodeService.class).asEagerSingleton();
|
||||
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
|
||||
|
||||
bind(MetaDataService.class).asEagerSingleton();
|
||||
|
@ -21,7 +21,6 @@ package org.elasticsearch.cluster.node;
|
||||
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
@ -46,25 +45,6 @@ public class DiscoveryNode implements Streamable, Serializable {
|
||||
return !(settings.getAsBoolean("node.client", false) || (!settings.getAsBoolean("node.data", true) && !settings.getAsBoolean("node.master", true)));
|
||||
}
|
||||
|
||||
public static Map<String, String> buildCommonNodesAttributes(Settings settings) {
|
||||
Map<String, String> attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap());
|
||||
attributes.remove("name"); // name is extracted in other places
|
||||
if (attributes.containsKey("client")) {
|
||||
if (attributes.get("client").equals("false")) {
|
||||
attributes.remove("client"); // this is the default
|
||||
} else {
|
||||
// if we are client node, don't store data ...
|
||||
attributes.put("data", "false");
|
||||
}
|
||||
}
|
||||
if (attributes.containsKey("data")) {
|
||||
if (attributes.get("data").equals("true")) {
|
||||
attributes.remove("data");
|
||||
}
|
||||
}
|
||||
return attributes;
|
||||
}
|
||||
|
||||
public static final ImmutableList<DiscoveryNode> EMPTY_LIST = ImmutableList.of();
|
||||
|
||||
private String nodeName = "".intern();
|
||||
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.node;
|
||||
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DiscoveryNodeService extends AbstractComponent {
|
||||
|
||||
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<CustomAttributesProvider>();
|
||||
|
||||
@Inject public DiscoveryNodeService(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) {
|
||||
customAttributesProviders.add(customAttributesProvider);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Map<String, String> buildAttributes() {
|
||||
Map<String, String> attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap());
|
||||
attributes.remove("name"); // name is extracted in other places
|
||||
if (attributes.containsKey("client")) {
|
||||
if (attributes.get("client").equals("false")) {
|
||||
attributes.remove("client"); // this is the default
|
||||
} else {
|
||||
// if we are client node, don't store data ...
|
||||
attributes.put("data", "false");
|
||||
}
|
||||
}
|
||||
if (attributes.containsKey("data")) {
|
||||
if (attributes.get("data").equals("true")) {
|
||||
attributes.remove("data");
|
||||
}
|
||||
}
|
||||
|
||||
for (CustomAttributesProvider provider : customAttributesProviders) {
|
||||
try {
|
||||
Map<String, String> customAttributes = provider.buildAttributes();
|
||||
if (customAttributes != null) {
|
||||
for (Map.Entry<String, String> entry : customAttributes.entrySet()) {
|
||||
if (!attributes.containsKey(entry.getKey())) {
|
||||
attributes.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to build custom attributes from provider [{}]", e, provider);
|
||||
}
|
||||
}
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
public static interface CustomAttributesProvider {
|
||||
|
||||
Map<String, String> buildAttributes();
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
@ -46,7 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
|
||||
import static org.elasticsearch.common.collect.Sets.*;
|
||||
|
||||
/**
|
||||
@ -58,6 +58,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final DiscoveryNodeService discoveryNodeService;
|
||||
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private DiscoveryNode localNode;
|
||||
@ -73,11 +75,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
||||
|
||||
private static final AtomicLong nodeIdGenerator = new AtomicLong();
|
||||
|
||||
@Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService) {
|
||||
@Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
|
||||
DiscoveryNodeService discoveryNodeService) {
|
||||
super(settings);
|
||||
this.clusterName = clusterName;
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.discoveryNodeService = discoveryNodeService;
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
@ -88,7 +92,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
||||
clusterGroups.put(clusterName, clusterGroup);
|
||||
}
|
||||
logger.debug("Connected to cluster [{}]", clusterName);
|
||||
this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings));
|
||||
this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(),
|
||||
discoveryNodeService.buildAttributes());
|
||||
|
||||
clusterGroup.members().add(this);
|
||||
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.common.UUID;
|
||||
@ -59,7 +60,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
|
||||
import static org.elasticsearch.common.collect.Lists.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
@ -77,6 +77,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private final DiscoveryNodeService discoveryNodeService;
|
||||
|
||||
private final ZenPingService pingService;
|
||||
|
||||
private final MasterFaultDetection masterFD;
|
||||
@ -110,12 +112,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||
|
||||
@Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
|
||||
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
|
||||
ZenPingService pingService) {
|
||||
DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
|
||||
super(settings);
|
||||
this.clusterName = clusterName;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.discoveryNodeService = discoveryNodeService;
|
||||
this.pingService = pingService;
|
||||
|
||||
// also support direct discovery.zen settings, for cases when it gets extended
|
||||
@ -138,7 +141,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
Map<String, String> nodeAttributes = buildCommonNodesAttributes(settings);
|
||||
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
|
||||
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
|
||||
String nodeId = UUID.randomBase64UUID();
|
||||
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
|
||||
|
@ -27,6 +27,8 @@ import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
|
||||
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
@ -38,13 +40,16 @@ import org.elasticsearch.common.settings.SettingsFilter;
|
||||
*/
|
||||
public class AwsEc2Service extends AbstractLifecycleComponent<AwsEc2Service> {
|
||||
|
||||
public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";
|
||||
|
||||
private AmazonEC2Client client;
|
||||
|
||||
@Inject public AwsEc2Service(Settings settings, SettingsFilter settingsFilter, NetworkService networkService) {
|
||||
@Inject public AwsEc2Service(Settings settings, SettingsFilter settingsFilter, NetworkService networkService, DiscoveryNodeService discoveryNodeService) {
|
||||
super(settings);
|
||||
settingsFilter.addFilter(new AwsSettingsFilter());
|
||||
// add specific ec2 name resolver
|
||||
networkService.addCustomNameResolver(new Ec2NameResolver(settings));
|
||||
discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings));
|
||||
}
|
||||
|
||||
public synchronized AmazonEC2 client() {
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.cloud.aws.network;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.Closeables;
|
||||
import org.elasticsearch.common.network.NetworkService.CustomNameResolver;
|
||||
@ -78,8 +79,6 @@ public class Ec2NameResolver extends AbstractComponent implements CustomNameReso
|
||||
}
|
||||
}
|
||||
|
||||
private static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";
|
||||
|
||||
/**
|
||||
* Construct a {@link CustomNameResolver}.
|
||||
*/
|
||||
@ -97,7 +96,7 @@ public class Ec2NameResolver extends AbstractComponent implements CustomNameReso
|
||||
URLConnection urlConnection = null;
|
||||
InputStream in = null;
|
||||
try {
|
||||
URL url = new URL(EC2_METADATA_URL + type.ec2Name);
|
||||
URL url = new URL(AwsEc2Service.EC2_METADATA_URL + type.ec2Name);
|
||||
logger.debug("obtaining ec2 hostname from ec2 meta-data url {}", url);
|
||||
urlConnection = url.openConnection();
|
||||
urlConnection.setConnectTimeout(2000);
|
||||
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cloud.aws.node;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.Closeables;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class Ec2CustomNodeAttributes extends AbstractComponent implements DiscoveryNodeService.CustomAttributesProvider {
|
||||
|
||||
public Ec2CustomNodeAttributes(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
@Override public Map<String, String> buildAttributes() {
|
||||
if (!settings.getAsBoolean("cloud.node.auto_attributes", false)) {
|
||||
return null;
|
||||
}
|
||||
Map<String, String> ec2Attributes = Maps.newHashMap();
|
||||
|
||||
URLConnection urlConnection;
|
||||
InputStream in = null;
|
||||
try {
|
||||
URL url = new URL(AwsEc2Service.EC2_METADATA_URL + "placement/availability-zone");
|
||||
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
|
||||
urlConnection = url.openConnection();
|
||||
urlConnection.setConnectTimeout(2000);
|
||||
in = urlConnection.getInputStream();
|
||||
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in));
|
||||
|
||||
String metadataResult = urlReader.readLine();
|
||||
if (metadataResult == null || metadataResult.length() == 0) {
|
||||
logger.error("no ec2 metadata returned from {}", url);
|
||||
return null;
|
||||
}
|
||||
ec2Attributes.put("aws_availability_zone", metadataResult);
|
||||
} catch (IOException e) {
|
||||
logger.debug("failed to get metadata for [placement/availability-zone]: " + ExceptionsHelper.detailedMessage(e));
|
||||
} finally {
|
||||
Closeables.closeQuietly(in);
|
||||
}
|
||||
|
||||
return ec2Attributes;
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@ package org.elasticsearch.discovery.ec2;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -39,8 +40,9 @@ import org.elasticsearch.transport.TransportService;
|
||||
public class Ec2Discovery extends ZenDiscovery {
|
||||
|
||||
@Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService, AwsEc2Service ec2Service) {
|
||||
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, pingService);
|
||||
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService,
|
||||
DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service) {
|
||||
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, discoveryNodeService, pingService);
|
||||
if (settings.getAsBoolean("cloud.enabled", true)) {
|
||||
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
|
||||
UnicastZenPing unicastZenPing = null;
|
||||
|
Loading…
x
Reference in New Issue
Block a user