mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
[discovery-ec2] Move integration tests to unit tests
Follow up for #12844 but in master branch where cloud-aws has been split in 2 projects. So we need to backport manually changes...
This commit is contained in:
parent
aebd8da7a4
commit
42237ed982
@ -19,157 +19,37 @@
|
||||
|
||||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.auth.*;
|
||||
import com.amazonaws.internal.StaticCredentialsProvider;
|
||||
import com.amazonaws.services.ec2.AmazonEC2;
|
||||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
|
||||
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class AwsEc2Service extends AbstractLifecycleComponent<AwsEc2Service> {
|
||||
|
||||
public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";
|
||||
|
||||
private AmazonEC2Client client;
|
||||
|
||||
@Inject
|
||||
public AwsEc2Service(Settings settings, SettingsFilter settingsFilter, NetworkService networkService, DiscoveryNodeService discoveryNodeService) {
|
||||
super(settings);
|
||||
settingsFilter.addFilter("cloud.aws.access_key");
|
||||
settingsFilter.addFilter("cloud.aws.secret_key");
|
||||
// Filter repository-specific settings
|
||||
settingsFilter.addFilter("access_key");
|
||||
settingsFilter.addFilter("secret_key");
|
||||
// add specific ec2 name resolver
|
||||
networkService.addCustomNameResolver(new Ec2NameResolver(settings));
|
||||
discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings));
|
||||
public interface AwsEc2Service extends LifecycleComponent<AwsEc2Service> {
|
||||
final class CLOUD_AWS {
|
||||
public static final String KEY = "cloud.aws.access_key";
|
||||
public static final String SECRET = "cloud.aws.secret_key";
|
||||
public static final String PROTOCOL = "cloud.aws.protocol";
|
||||
public static final String PROXY_HOST = "cloud.aws.proxy_host";
|
||||
public static final String PROXY_PORT = "cloud.aws.proxy_port";
|
||||
public static final String SIGNER = "cloud.aws.signer";
|
||||
public static final String REGION = "cloud.aws.region";
|
||||
}
|
||||
|
||||
public synchronized AmazonEC2 client() {
|
||||
if (client != null) {
|
||||
return client;
|
||||
}
|
||||
|
||||
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
||||
// the response metadata cache is only there for diagnostics purposes,
|
||||
// but can force objects from every response to the old generation.
|
||||
clientConfiguration.setResponseMetadataCacheSize(0);
|
||||
String protocol = settings.get("cloud.aws.protocol", "https").toLowerCase(Locale.ROOT);
|
||||
protocol = settings.get("cloud.aws.ec2.protocol", protocol).toLowerCase(Locale.ROOT);
|
||||
if ("http".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTP);
|
||||
} else if ("https".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTPS);
|
||||
} else {
|
||||
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
|
||||
}
|
||||
String account = settings.get("cloud.aws.access_key");
|
||||
String key = settings.get("cloud.aws.secret_key");
|
||||
|
||||
String proxyHost = settings.get("cloud.aws.proxy_host");
|
||||
proxyHost = settings.get("cloud.aws.ec2.proxy_host", proxyHost);
|
||||
if (proxyHost != null) {
|
||||
String portString = settings.get("cloud.aws.proxy_port", "80");
|
||||
portString = settings.get("cloud.aws.ec2.proxy_port", portString);
|
||||
Integer proxyPort;
|
||||
try {
|
||||
proxyPort = Integer.parseInt(portString, 10);
|
||||
} catch (NumberFormatException ex) {
|
||||
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
|
||||
}
|
||||
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
|
||||
}
|
||||
|
||||
// #155: we might have 3rd party users using older EC2 API version
|
||||
String awsSigner = settings.get("cloud.aws.ec2.signer", settings.get("cloud.aws.signer"));
|
||||
if (awsSigner != null) {
|
||||
logger.debug("using AWS API signer [{}]", awsSigner);
|
||||
AwsSigner.configureSigner(awsSigner, clientConfiguration);
|
||||
}
|
||||
|
||||
AWSCredentialsProvider credentials;
|
||||
|
||||
if (account == null && key == null) {
|
||||
credentials = new AWSCredentialsProviderChain(
|
||||
new EnvironmentVariableCredentialsProvider(),
|
||||
new SystemPropertiesCredentialsProvider(),
|
||||
new InstanceProfileCredentialsProvider()
|
||||
);
|
||||
} else {
|
||||
credentials = new AWSCredentialsProviderChain(
|
||||
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
|
||||
);
|
||||
}
|
||||
|
||||
this.client = new AmazonEC2Client(credentials, clientConfiguration);
|
||||
|
||||
if (settings.get("cloud.aws.ec2.endpoint") != null) {
|
||||
String endpoint = settings.get("cloud.aws.ec2.endpoint");
|
||||
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
|
||||
client.setEndpoint(endpoint);
|
||||
} else if (settings.get("cloud.aws.region") != null) {
|
||||
String region = settings.get("cloud.aws.region").toLowerCase(Locale.ROOT);
|
||||
String endpoint;
|
||||
if (region.equals("us-east-1") || region.equals("us-east")) {
|
||||
endpoint = "ec2.us-east-1.amazonaws.com";
|
||||
} else if (region.equals("us-west") || region.equals("us-west-1")) {
|
||||
endpoint = "ec2.us-west-1.amazonaws.com";
|
||||
} else if (region.equals("us-west-2")) {
|
||||
endpoint = "ec2.us-west-2.amazonaws.com";
|
||||
} else if (region.equals("ap-southeast") || region.equals("ap-southeast-1")) {
|
||||
endpoint = "ec2.ap-southeast-1.amazonaws.com";
|
||||
} else if (region.equals("ap-southeast-2")) {
|
||||
endpoint = "ec2.ap-southeast-2.amazonaws.com";
|
||||
} else if (region.equals("ap-northeast") || region.equals("ap-northeast-1")) {
|
||||
endpoint = "ec2.ap-northeast-1.amazonaws.com";
|
||||
} else if (region.equals("eu-west") || region.equals("eu-west-1")) {
|
||||
endpoint = "ec2.eu-west-1.amazonaws.com";
|
||||
} else if (region.equals("eu-central") || region.equals("eu-central-1")) {
|
||||
endpoint = "ec2.eu-central-1.amazonaws.com";
|
||||
} else if (region.equals("sa-east") || region.equals("sa-east-1")) {
|
||||
endpoint = "ec2.sa-east-1.amazonaws.com";
|
||||
} else if (region.equals("cn-north") || region.equals("cn-north-1")) {
|
||||
endpoint = "ec2.cn-north-1.amazonaws.com.cn";
|
||||
} else {
|
||||
throw new IllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
|
||||
}
|
||||
if (endpoint != null) {
|
||||
logger.debug("using ec2 region [{}], with endpoint [{}]", region, endpoint);
|
||||
client.setEndpoint(endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
return this.client;
|
||||
|
||||
final class CLOUD_EC2 {
|
||||
public static final String KEY = "cloud.aws.ec2.access_key";
|
||||
public static final String SECRET = "cloud.aws.ec2.secret_key";
|
||||
public static final String PROTOCOL = "cloud.aws.ec2.protocol";
|
||||
public static final String PROXY_HOST = "cloud.aws.ec2.proxy_host";
|
||||
public static final String PROXY_PORT = "cloud.aws.ec2.proxy_port";
|
||||
public static final String SIGNER = "cloud.aws.ec2.signer";
|
||||
public static final String ENDPOINT = "cloud.aws.ec2.endpoint";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticsearchException {
|
||||
final class DISCOVERY_EC2 {
|
||||
public static final String HOST_TYPE = "discovery.ec2.host_type";
|
||||
public static final String ANY_GROUP = "discovery.ec2.any_group";
|
||||
public static final String GROUPS = "discovery.ec2.groups";
|
||||
public static final String TAG_PREFIX = "discovery.ec2.tag.";
|
||||
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticsearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticsearchException {
|
||||
if (client != null) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
AmazonEC2 client();
|
||||
}
|
||||
|
@ -0,0 +1,173 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.auth.*;
|
||||
import com.amazonaws.internal.StaticCredentialsProvider;
|
||||
import com.amazonaws.services.ec2.AmazonEC2;
|
||||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
|
||||
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service> implements AwsEc2Service {
|
||||
|
||||
public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";
|
||||
|
||||
private AmazonEC2Client client;
|
||||
|
||||
@Inject
|
||||
public AwsEc2ServiceImpl(Settings settings, SettingsFilter settingsFilter, NetworkService networkService, DiscoveryNodeService discoveryNodeService) {
|
||||
super(settings);
|
||||
// Filter global settings
|
||||
settingsFilter.addFilter(CLOUD_AWS.KEY);
|
||||
settingsFilter.addFilter(CLOUD_AWS.SECRET);
|
||||
settingsFilter.addFilter(CLOUD_EC2.KEY);
|
||||
settingsFilter.addFilter(CLOUD_EC2.SECRET);
|
||||
// add specific ec2 name resolver
|
||||
networkService.addCustomNameResolver(new Ec2NameResolver(settings));
|
||||
discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings));
|
||||
}
|
||||
|
||||
public synchronized AmazonEC2 client() {
|
||||
if (client != null) {
|
||||
return client;
|
||||
}
|
||||
|
||||
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
||||
// the response metadata cache is only there for diagnostics purposes,
|
||||
// but can force objects from every response to the old generation.
|
||||
clientConfiguration.setResponseMetadataCacheSize(0);
|
||||
String protocol = settings.get(CLOUD_EC2.PROTOCOL, settings.get(CLOUD_AWS.PROTOCOL, "https")).toLowerCase(Locale.ROOT);
|
||||
if ("http".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTP);
|
||||
} else if ("https".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTPS);
|
||||
} else {
|
||||
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
|
||||
}
|
||||
String account = settings.get(CLOUD_EC2.KEY, settings.get(CLOUD_AWS.KEY));
|
||||
String key = settings.get(CLOUD_EC2.SECRET, settings.get(CLOUD_AWS.SECRET));
|
||||
|
||||
String proxyHost = settings.get(CLOUD_EC2.PROXY_HOST, settings.get(CLOUD_AWS.PROXY_HOST));
|
||||
if (proxyHost != null) {
|
||||
String portString = settings.get(CLOUD_EC2.PROXY_PORT, settings.get(CLOUD_AWS.PROXY_PORT, "80"));
|
||||
Integer proxyPort;
|
||||
try {
|
||||
proxyPort = Integer.parseInt(portString, 10);
|
||||
} catch (NumberFormatException ex) {
|
||||
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
|
||||
}
|
||||
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
|
||||
}
|
||||
|
||||
// #155: we might have 3rd party users using older EC2 API version
|
||||
String awsSigner = settings.get(CLOUD_EC2.SIGNER, settings.get(CLOUD_AWS.SIGNER));
|
||||
if (awsSigner != null) {
|
||||
logger.debug("using AWS API signer [{}]", awsSigner);
|
||||
try {
|
||||
AwsSigner.configureSigner(awsSigner, clientConfiguration);
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.warn("wrong signer set for [{}] or [{}]: [{}]",
|
||||
CLOUD_EC2.SIGNER, CLOUD_AWS.SIGNER, awsSigner);
|
||||
}
|
||||
}
|
||||
|
||||
AWSCredentialsProvider credentials;
|
||||
|
||||
if (account == null && key == null) {
|
||||
credentials = new AWSCredentialsProviderChain(
|
||||
new EnvironmentVariableCredentialsProvider(),
|
||||
new SystemPropertiesCredentialsProvider(),
|
||||
new InstanceProfileCredentialsProvider()
|
||||
);
|
||||
} else {
|
||||
credentials = new AWSCredentialsProviderChain(
|
||||
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
|
||||
);
|
||||
}
|
||||
|
||||
this.client = new AmazonEC2Client(credentials, clientConfiguration);
|
||||
|
||||
if (settings.get(CLOUD_EC2.ENDPOINT) != null) {
|
||||
String endpoint = settings.get(CLOUD_EC2.ENDPOINT);
|
||||
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
|
||||
client.setEndpoint(endpoint);
|
||||
} else if (settings.get(CLOUD_AWS.REGION) != null) {
|
||||
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT);
|
||||
String endpoint;
|
||||
if (region.equals("us-east-1") || region.equals("us-east")) {
|
||||
endpoint = "ec2.us-east-1.amazonaws.com";
|
||||
} else if (region.equals("us-west") || region.equals("us-west-1")) {
|
||||
endpoint = "ec2.us-west-1.amazonaws.com";
|
||||
} else if (region.equals("us-west-2")) {
|
||||
endpoint = "ec2.us-west-2.amazonaws.com";
|
||||
} else if (region.equals("ap-southeast") || region.equals("ap-southeast-1")) {
|
||||
endpoint = "ec2.ap-southeast-1.amazonaws.com";
|
||||
} else if (region.equals("ap-southeast-2")) {
|
||||
endpoint = "ec2.ap-southeast-2.amazonaws.com";
|
||||
} else if (region.equals("ap-northeast") || region.equals("ap-northeast-1")) {
|
||||
endpoint = "ec2.ap-northeast-1.amazonaws.com";
|
||||
} else if (region.equals("eu-west") || region.equals("eu-west-1")) {
|
||||
endpoint = "ec2.eu-west-1.amazonaws.com";
|
||||
} else if (region.equals("eu-central") || region.equals("eu-central-1")) {
|
||||
endpoint = "ec2.eu-central-1.amazonaws.com";
|
||||
} else if (region.equals("sa-east") || region.equals("sa-east-1")) {
|
||||
endpoint = "ec2.sa-east-1.amazonaws.com";
|
||||
} else if (region.equals("cn-north") || region.equals("cn-north-1")) {
|
||||
endpoint = "ec2.cn-north-1.amazonaws.com.cn";
|
||||
} else {
|
||||
throw new IllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
|
||||
}
|
||||
logger.debug("using ec2 region [{}], with endpoint [{}]", region, endpoint);
|
||||
client.setEndpoint(endpoint);
|
||||
}
|
||||
|
||||
return this.client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticsearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticsearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticsearchException {
|
||||
if (client != null) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
@ -25,6 +25,6 @@ public class Ec2Module extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(AwsEc2Service.class).asEagerSingleton();
|
||||
bind(AwsEc2Service.class).to(AwsEc2ServiceImpl.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@
|
||||
package org.elasticsearch.cloud.aws.network;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.network.NetworkService.CustomNameResolver;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -96,7 +96,7 @@ public class Ec2NameResolver extends AbstractComponent implements CustomNameReso
|
||||
URLConnection urlConnection = null;
|
||||
InputStream in = null;
|
||||
try {
|
||||
URL url = new URL(AwsEc2Service.EC2_METADATA_URL + type.ec2Name);
|
||||
URL url = new URL(AwsEc2ServiceImpl.EC2_METADATA_URL + type.ec2Name);
|
||||
logger.debug("obtaining ec2 hostname from ec2 meta-data url {}", url);
|
||||
urlConnection = url.openConnection();
|
||||
urlConnection.setConnectTimeout(2000);
|
||||
|
@ -20,7 +20,7 @@
|
||||
package org.elasticsearch.cloud.aws.node;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -53,7 +53,7 @@ public class Ec2CustomNodeAttributes extends AbstractComponent implements Discov
|
||||
URLConnection urlConnection;
|
||||
InputStream in = null;
|
||||
try {
|
||||
URL url = new URL(AwsEc2Service.EC2_METADATA_URL + "placement/availability-zone");
|
||||
URL url = new URL(AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone");
|
||||
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
|
||||
urlConnection = url.openConnection();
|
||||
urlConnection.setConnectTimeout(2000);
|
||||
|
@ -24,6 +24,7 @@ import com.amazonaws.services.ec2.AmazonEC2;
|
||||
import com.amazonaws.services.ec2.model.*;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
@ -31,7 +32,6 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.*;
|
||||
@ -71,18 +71,19 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||
this.client = awsEc2Service.client();
|
||||
this.version = version;
|
||||
|
||||
this.hostType = HostType.valueOf(settings.get("discovery.ec2.host_type", "private_ip").toUpperCase(Locale.ROOT));
|
||||
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip")
|
||||
.toUpperCase(Locale.ROOT));
|
||||
|
||||
this.bindAnyGroup = settings.getAsBoolean("discovery.ec2.any_group", true);
|
||||
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
|
||||
this.groups = new HashSet<>();
|
||||
groups.addAll(Arrays.asList(settings.getAsArray("discovery.ec2.groups")));
|
||||
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS)));
|
||||
|
||||
this.tags = settings.getByPrefix("discovery.ec2.tag.").getAsMap();
|
||||
this.tags = settings.getByPrefix(DISCOVERY_EC2.TAG_PREFIX).getAsMap();
|
||||
|
||||
Set<String> availabilityZones = new HashSet<>();
|
||||
availabilityZones.addAll(Arrays.asList(settings.getAsArray("discovery.ec2.availability_zones")));
|
||||
if (settings.get("discovery.ec2.availability_zones") != null) {
|
||||
availabilityZones.addAll(Strings.commaDelimitedListToSet(settings.get("discovery.ec2.availability_zones")));
|
||||
availabilityZones.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.AVAILABILITY_ZONES)));
|
||||
if (settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES) != null) {
|
||||
availabilityZones.addAll(Strings.commaDelimitedListToSet(settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES)));
|
||||
}
|
||||
this.availabilityZones = availabilityZones;
|
||||
|
||||
|
@ -19,7 +19,7 @@
|
||||
|
||||
package org.elasticsearch.plugin.discovery.ec2;
|
||||
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
|
||||
import org.elasticsearch.cloud.aws.Ec2Module;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
@ -75,7 +75,7 @@ public class Ec2DiscoveryPlugin extends Plugin {
|
||||
@Override
|
||||
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
|
||||
Collection<Class<? extends LifecycleComponent>> services = new ArrayList<>();
|
||||
services.add(AwsEc2Service.class);
|
||||
services.add(AwsEc2ServiceImpl.class);
|
||||
return services;
|
||||
}
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.ec2;
|
||||
|
||||
import com.amazonaws.services.ec2.AmazonEC2;
|
||||
import com.amazonaws.services.ec2.model.Tag;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class AwsEc2ServiceMock extends AbstractLifecycleComponent<AwsEc2Service> implements AwsEc2Service {
|
||||
|
||||
private int nodes;
|
||||
private List<List<Tag>> tagsList;
|
||||
private AmazonEC2 client;
|
||||
|
||||
public AwsEc2ServiceMock(Settings settings, int nodes, List<List<Tag>> tagsList) {
|
||||
super(settings);
|
||||
this.nodes = nodes;
|
||||
this.tagsList = tagsList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AmazonEC2 client() {
|
||||
if (client == null) {
|
||||
client = new AmazonEC2Mock(nodes, tagsList);
|
||||
}
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
|
||||
}
|
||||
}
|
@ -19,31 +19,218 @@
|
||||
|
||||
package org.elasticsearch.discovery.ec2;
|
||||
|
||||
|
||||
import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
|
||||
import com.amazonaws.services.ec2.model.Tag;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* Just an empty Node Start test to check everything if fine when
|
||||
* starting.
|
||||
* This test requires AWS to run.
|
||||
*
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
|
||||
public class Ec2DiscoveryTests extends AbstractAwsTestCase {
|
||||
public class Ec2DiscoveryTests extends ESTestCase {
|
||||
|
||||
protected static ThreadPool threadPool;
|
||||
protected MockTransportService transportService;
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPool() {
|
||||
threadPool = new ThreadPool(Ec2DiscoveryTests.class.getName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopThreadPool() throws InterruptedException {
|
||||
if (threadPool !=null) {
|
||||
terminate(threadPool);
|
||||
threadPool = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createTransportService() {
|
||||
transportService = new MockTransportService(
|
||||
Settings.EMPTY,
|
||||
new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool);
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
|
||||
return buildDynamicNodes(nodeSettings, nodes, null);
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
|
||||
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(nodeSettings, nodes, tagsList);
|
||||
|
||||
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService,
|
||||
awsEc2Service, Version.CURRENT);
|
||||
|
||||
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
|
||||
logger.debug("--> nodes found: {}", discoveryNodes);
|
||||
return discoveryNodes;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStart() {
|
||||
Settings nodeSettings = settingsBuilder()
|
||||
.put("plugin.types", Ec2DiscoveryPlugin.class.getName())
|
||||
.put("discovery.type", "ec2")
|
||||
public void defaultSettings() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.build();
|
||||
internalCluster().startNode(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void privateIp() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "private_ip")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = new LocalTransportAddress(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
|
||||
assertThat(address.sameHost(expected), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publicIp() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "public_ip")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = new LocalTransportAddress(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
|
||||
assertThat(address.sameHost(expected), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void privateDns() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "private_dns")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
String instanceId = "node" + node++;
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = new LocalTransportAddress(
|
||||
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
|
||||
assertThat(address.sameHost(expected), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publicDns() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "public_dns")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
String instanceId = "node" + node++;
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = new LocalTransportAddress(
|
||||
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
|
||||
assertThat(address.sameHost(expected), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void invalidHostType() throws InterruptedException {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "does_not_exist")
|
||||
.build();
|
||||
buildDynamicNodes(nodeSettings, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filterByTags() throws InterruptedException {
|
||||
int nodes = randomIntBetween(5, 10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod")
|
||||
.build();
|
||||
|
||||
int prodInstances = 0;
|
||||
List<List<Tag>> tagsList = new ArrayList<>();
|
||||
|
||||
for (int node = 0; node < nodes; node++) {
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
if (randomBoolean()) {
|
||||
tags.add(new Tag("stage", "prod"));
|
||||
prodInstances++;
|
||||
} else {
|
||||
tags.add(new Tag("stage", "dev"));
|
||||
}
|
||||
tagsList.add(tags);
|
||||
}
|
||||
|
||||
logger.info("started [{}] instances with [{}] stage=prod tag");
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
|
||||
assertThat(discoveryNodes, hasSize(prodInstances));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filterByMultipleTags() throws InterruptedException {
|
||||
int nodes = randomIntBetween(5, 10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.putArray(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod", "preprod")
|
||||
.build();
|
||||
|
||||
int prodInstances = 0;
|
||||
List<List<Tag>> tagsList = new ArrayList<>();
|
||||
|
||||
for (int node = 0; node < nodes; node++) {
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
if (randomBoolean()) {
|
||||
tags.add(new Tag("stage", "prod"));
|
||||
if (randomBoolean()) {
|
||||
tags.add(new Tag("stage", "preprod"));
|
||||
prodInstances++;
|
||||
}
|
||||
} else {
|
||||
tags.add(new Tag("stage", "dev"));
|
||||
if (randomBoolean()) {
|
||||
tags.add(new Tag("stage", "preprod"));
|
||||
}
|
||||
}
|
||||
tagsList.add(tags);
|
||||
}
|
||||
|
||||
logger.info("started [{}] instances with [{}] stage=prod tag");
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
|
||||
assertThat(discoveryNodes, hasSize(prodInstances));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user