Merge branch 'pr/16477-aws-settings'

This commit is contained in:
David Pilato 2016-02-11 19:47:43 +01:00
commit df50371c34
21 changed files with 1196 additions and 344 deletions

View File

@ -107,6 +107,18 @@ public class SettingsModule extends AbstractModule {
} }
} }
/**
* Check if a setting has already been registered
*/
public boolean exists(Setting<?> setting) {
switch (setting.getScope()) {
case CLUSTER:
return clusterSettings.containsKey(setting.getKey());
case INDEX:
return indexSettings.containsKey(setting.getKey());
}
throw new IllegalArgumentException("setting scope is unknown. This should never happen!");
}
private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) { private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) {
Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true); Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true);

View File

@ -19,42 +19,179 @@
package org.elasticsearch.cloud.aws; package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.component.LifecycleComponent; import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
public interface AwsEc2Service extends LifecycleComponent<AwsEc2Service> { public interface AwsEc2Service {
final class CLOUD_AWS { Setting<Boolean> AUTO_ATTRIBUTE_SETTING = Setting.boolSetting("cloud.node.auto_attributes", false, false, Setting.Scope.CLUSTER);
public static final String KEY = "cloud.aws.access_key";
public static final String SECRET = "cloud.aws.secret_key"; // Global AWS settings (shared between discovery-ec2 and repository-s3)
public static final String PROTOCOL = "cloud.aws.protocol"; // Each setting starting with `cloud.aws` also exists in repository-s3 project. Don't forget to update
public static final String PROXY_HOST = "cloud.aws.proxy.host"; // the code there if you change anything here.
public static final String PROXY_PORT = "cloud.aws.proxy.port"; /**
public static final String PROXY_USERNAME = "cloud.aws.proxy.username"; * cloud.aws.access_key: AWS Access key. Shared with repository-s3 plugin
public static final String PROXY_PASSWORD = "cloud.aws.proxy.password"; */
public static final String SIGNER = "cloud.aws.signer"; Setting<String> KEY_SETTING = Setting.simpleString("cloud.aws.access_key", false, Setting.Scope.CLUSTER);
public static final String REGION = "cloud.aws.region"; /**
* cloud.aws.secret_key: AWS Secret key. Shared with repository-s3 plugin
*/
Setting<String> SECRET_SETTING = Setting.simpleString("cloud.aws.secret_key", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.protocol: Protocol for AWS API: http or https. Defaults to https. Shared with repository-s3 plugin
*/
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)),
false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.host: In case of proxy, define its hostname/IP. Shared with repository-s3 plugin
*/
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("cloud.aws.proxy.host", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.port: In case of proxy, define its port. Defaults to 80. Shared with repository-s3 plugin
*/
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("cloud.aws.proxy.port", 80, 0, 1<<16, false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.username: In case of proxy with auth, define the username. Shared with repository-s3 plugin
*/
Setting<String> PROXY_USERNAME_SETTING = Setting.simpleString("cloud.aws.proxy.username", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.password: In case of proxy with auth, define the password. Shared with repository-s3 plugin
*/
Setting<String> PROXY_PASSWORD_SETTING = Setting.simpleString("cloud.aws.proxy.password", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.signer: If you are using an old AWS API version, you can define a Signer. Shared with repository-s3 plugin
*/
Setting<String> SIGNER_SETTING = Setting.simpleString("cloud.aws.signer", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.region: Region. Shared with repository-s3 plugin
*/
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
/**
* Defines specific ec2 settings starting with cloud.aws.ec2.
*/
interface CLOUD_EC2 {
/**
* cloud.aws.ec2.access_key: AWS Access key specific for EC2 API calls. Defaults to cloud.aws.access_key.
* @see AwsEc2Service#KEY_SETTING
*/
Setting<String> KEY_SETTING = new Setting<>("cloud.aws.ec2.access_key", AwsEc2Service.KEY_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.secret_key: AWS Secret key specific for EC2 API calls. Defaults to cloud.aws.secret_key.
* @see AwsEc2Service#SECRET_SETTING
*/
Setting<String> SECRET_SETTING = new Setting<>("cloud.aws.ec2.secret_key", AwsEc2Service.SECRET_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.protocol: Protocol for AWS API specific for EC2 API calls: http or https. Defaults to cloud.aws.protocol.
* @see AwsEc2Service#PROTOCOL_SETTING
*/
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.ec2.protocol", AwsEc2Service.PROTOCOL_SETTING,
s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.proxy.host: In case of proxy, define its hostname/IP specific for EC2 API calls. Defaults to cloud.aws.proxy.host.
* @see AwsEc2Service#PROXY_HOST_SETTING
*/
Setting<String> PROXY_HOST_SETTING = new Setting<>("cloud.aws.ec2.proxy.host", AwsEc2Service.PROXY_HOST_SETTING,
Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.proxy.port: In case of proxy, define its port specific for EC2 API calls. Defaults to cloud.aws.proxy.port.
* @see AwsEc2Service#PROXY_PORT_SETTING
*/
Setting<Integer> PROXY_PORT_SETTING = new Setting<>("cloud.aws.ec2.proxy.port", AwsEc2Service.PROXY_PORT_SETTING,
s -> Setting.parseInt(s, 0, 1<<16, "cloud.aws.ec2.proxy.port"), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.proxy.username: In case of proxy with auth, define the username specific for EC2 API calls.
* Defaults to cloud.aws.proxy.username.
* @see AwsEc2Service#PROXY_USERNAME_SETTING
*/
Setting<String> PROXY_USERNAME_SETTING = new Setting<>("cloud.aws.ec2.proxy.username", AwsEc2Service.PROXY_USERNAME_SETTING,
Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.proxy.password: In case of proxy with auth, define the password specific for EC2 API calls.
* Defaults to cloud.aws.proxy.password.
* @see AwsEc2Service#PROXY_PASSWORD_SETTING
*/
Setting<String> PROXY_PASSWORD_SETTING = new Setting<>("cloud.aws.ec2.proxy.password", AwsEc2Service.PROXY_PASSWORD_SETTING,
Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.signer: If you are using an old AWS API version, you can define a Signer. Specific for EC2 API calls.
* Defaults to cloud.aws.signer.
* @see AwsEc2Service#SIGNER_SETTING
*/
Setting<String> SIGNER_SETTING = new Setting<>("cloud.aws.ec2.signer", AwsEc2Service.SIGNER_SETTING, Function.identity(),
false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.region: Region specific for EC2 API calls. Defaults to cloud.aws.region.
* @see AwsEc2Service#REGION_SETTING
*/
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.ec2.region", AwsEc2Service.REGION_SETTING,
s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.ec2.endpoint: Endpoint. If not set, endpoint will be guessed based on region setting.
*/
Setting<String> ENDPOINT_SETTING = Setting.simpleString("cloud.aws.ec2.endpoint", false, Setting.Scope.CLUSTER);
} }
final class CLOUD_EC2 { /**
public static final String KEY = "cloud.aws.ec2.access_key"; * Defines discovery settings for ec2. Starting with discovery.ec2.
public static final String SECRET = "cloud.aws.ec2.secret_key"; */
public static final String PROTOCOL = "cloud.aws.ec2.protocol"; interface DISCOVERY_EC2 {
public static final String PROXY_HOST = "cloud.aws.ec2.proxy.host"; enum HostType {
public static final String PROXY_PORT = "cloud.aws.ec2.proxy.port"; PRIVATE_IP,
public static final String PROXY_USERNAME = "cloud.aws.ec2.proxy.username"; PUBLIC_IP,
public static final String PROXY_PASSWORD = "cloud.aws.ec2.proxy.password"; PRIVATE_DNS,
public static final String SIGNER = "cloud.aws.ec2.signer"; PUBLIC_DNS
public static final String ENDPOINT = "cloud.aws.ec2.endpoint";
} }
final class DISCOVERY_EC2 { /**
public static final String HOST_TYPE = "discovery.ec2.host_type"; * discovery.ec2.host_type: The type of host type to use to communicate with other instances.
public static final String ANY_GROUP = "discovery.ec2.any_group"; * Can be one of private_ip, public_ip, private_dns, public_dns. Defaults to private_ip.
public static final String GROUPS = "discovery.ec2.groups"; */
public static final String TAG_PREFIX = "discovery.ec2.tag."; Setting<HostType> HOST_TYPE_SETTING =
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones"; new Setting<>("discovery.ec2.host_type", HostType.PRIVATE_IP.name(), s -> HostType.valueOf(s.toUpperCase(Locale.ROOT)), false,
public static final String NODE_CACHE_TIME = "discovery.ec2.node_cache_time"; Setting.Scope.CLUSTER);
/**
* discovery.ec2.any_group: If set to false, will require all security groups to be present for the instance to be used for the
* discovery. Defaults to true.
*/
Setting<Boolean> ANY_GROUP_SETTING =
Setting.boolSetting("discovery.ec2.any_group", true, false, Setting.Scope.CLUSTER);
/**
* discovery.ec2.groups: Either a comma separated list or array based list of (security) groups. Only instances with the provided
* security groups will be used in the cluster discovery. (NOTE: You could provide either group NAME or group ID.)
*/
Setting<List<String>> GROUPS_SETTING =
Setting.listSetting("discovery.ec2.groups", new ArrayList<>(), s -> s.toString(), false, Setting.Scope.CLUSTER);
/**
* discovery.ec2.availability_zones: Either a comma separated list or array based list of availability zones. Only instances within
* the provided availability zones will be used in the cluster discovery.
*/
Setting<List<String>> AVAILABILITY_ZONES_SETTING =
Setting.listSetting("discovery.ec2.availability_zones", Collections.emptyList(), s -> s.toString(), false,
Setting.Scope.CLUSTER);
/**
* discovery.ec2.node_cache_time: How long the list of hosts is cached to prevent further requests to the AWS API. Defaults to 10s.
*/
Setting<TimeValue> NODE_CACHE_TIME_SETTING =
Setting.timeSetting("discovery.ec2.node_cache_time", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);
/**
* discovery.ec2.tag.*: The ec2 discovery can filter machines to include in the cluster based on tags (and not just groups).
* The settings to use include the discovery.ec2.tag. prefix. For example, setting discovery.ec2.tag.stage to dev will only filter
* instances with a tag key set to stage, and a value of dev. Several tags set will require all of those tags to be set for the
* instance to be included.
*/
Setting<Settings> TAG_SETTING = Setting.groupSetting("discovery.ec2.tag.", false,Setting.Scope.CLUSTER);
} }
AmazonEC2 client(); AmazonEC2 client();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain; import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
@ -33,18 +32,17 @@ import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.retry.RetryPolicy; import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver; import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes; import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.Locale;
import java.util.Random; import java.util.Random;
/** /**
@ -74,30 +72,15 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
// the response metadata cache is only there for diagnostics purposes, // the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation. // but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0); clientConfiguration.setResponseMetadataCacheSize(0);
String protocol = settings.get(CLOUD_EC2.PROTOCOL, settings.get(CLOUD_AWS.PROTOCOL, "https")).toLowerCase(Locale.ROOT); clientConfiguration.setProtocol(CLOUD_EC2.PROTOCOL_SETTING.get(settings));
if ("http".equals(protocol)) { String key = CLOUD_EC2.KEY_SETTING.get(settings);
clientConfiguration.setProtocol(Protocol.HTTP); String secret = CLOUD_EC2.SECRET_SETTING.get(settings);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = settings.get(CLOUD_EC2.KEY, settings.get(CLOUD_AWS.KEY));
String key = settings.get(CLOUD_EC2.SECRET, settings.get(CLOUD_AWS.SECRET));
String proxyHost = settings.get(CLOUD_AWS.PROXY_HOST); String proxyHost = CLOUD_EC2.PROXY_HOST_SETTING.get(settings);
proxyHost = settings.get(CLOUD_EC2.PROXY_HOST, proxyHost);
if (proxyHost != null) { if (proxyHost != null) {
String portString = settings.get(CLOUD_AWS.PROXY_PORT, "80"); Integer proxyPort = CLOUD_EC2.PROXY_PORT_SETTING.get(settings);
portString = settings.get(CLOUD_EC2.PROXY_PORT, portString); String proxyUsername = CLOUD_EC2.PROXY_USERNAME_SETTING.get(settings);
Integer proxyPort; String proxyPassword = CLOUD_EC2.PROXY_PASSWORD_SETTING.get(settings);
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
String proxyUsername = settings.get(CLOUD_EC2.PROXY_USERNAME, settings.get(CLOUD_AWS.PROXY_USERNAME));
String proxyPassword = settings.get(CLOUD_EC2.PROXY_PASSWORD, settings.get(CLOUD_AWS.PROXY_PASSWORD));
clientConfiguration clientConfiguration
.withProxyHost(proxyHost) .withProxyHost(proxyHost)
@ -107,15 +90,10 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
} }
// #155: we might have 3rd party users using older EC2 API version // #155: we might have 3rd party users using older EC2 API version
String awsSigner = settings.get(CLOUD_EC2.SIGNER, settings.get(CLOUD_AWS.SIGNER)); String awsSigner = CLOUD_EC2.SIGNER_SETTING.get(settings);
if (awsSigner != null) { if (Strings.hasText(awsSigner)) {
logger.debug("using AWS API signer [{}]", awsSigner); logger.debug("using AWS API signer [{}]", awsSigner);
try {
AwsSigner.configureSigner(awsSigner, clientConfiguration); AwsSigner.configureSigner(awsSigner, clientConfiguration);
} catch (IllegalArgumentException e) {
logger.warn("wrong signer set for [{}] or [{}]: [{}]",
CLOUD_EC2.SIGNER, CLOUD_AWS.SIGNER, awsSigner);
}
} }
// Increase the number of retries in case of 5xx API responses // Increase the number of retries in case of 5xx API responses
@ -138,7 +116,7 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
AWSCredentialsProvider credentials; AWSCredentialsProvider credentials;
if (account == null && key == null) { if (key == null && secret == null) {
credentials = new AWSCredentialsProviderChain( credentials = new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(), new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(), new SystemPropertiesCredentialsProvider(),
@ -146,19 +124,18 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
); );
} else { } else {
credentials = new AWSCredentialsProviderChain( credentials = new AWSCredentialsProviderChain(
new StaticCredentialsProvider(new BasicAWSCredentials(account, key)) new StaticCredentialsProvider(new BasicAWSCredentials(key, secret))
); );
} }
this.client = new AmazonEC2Client(credentials, clientConfiguration); this.client = new AmazonEC2Client(credentials, clientConfiguration);
if (settings.get(CLOUD_EC2.ENDPOINT) != null) { String endpoint = CLOUD_EC2.ENDPOINT_SETTING.get(settings);
String endpoint = settings.get(CLOUD_EC2.ENDPOINT); if (endpoint != null) {
logger.debug("using explicit ec2 endpoint [{}]", endpoint); logger.debug("using explicit ec2 endpoint [{}]", endpoint);
client.setEndpoint(endpoint); client.setEndpoint(endpoint);
} else if (settings.get(CLOUD_AWS.REGION) != null) { } else if (CLOUD_EC2.REGION_SETTING.exists(settings)) {
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT); String region = CLOUD_EC2.REGION_SETTING.get(settings);
String endpoint;
if (region.equals("us-east-1") || region.equals("us-east")) { if (region.equals("us-east-1") || region.equals("us-east")) {
endpoint = "ec2.us-east-1.amazonaws.com"; endpoint = "ec2.us-east-1.amazonaws.com";
} else if (region.equals("us-west") || region.equals("us-west-1")) { } else if (region.equals("us-west") || region.equals("us-west-1")) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.aws.node; package org.elasticsearch.cloud.aws.node;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl; import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -45,7 +46,7 @@ public class Ec2CustomNodeAttributes extends AbstractComponent implements Discov
@Override @Override
public Map<String, String> buildAttributes() { public Map<String, String> buildAttributes() {
if (!settings.getAsBoolean("cloud.node.auto_attributes", false)) { if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
return null; return null;
} }
Map<String, String> ec2Attributes = new HashMap<>(); Map<String, String> ec2Attributes = new HashMap<>();

View File

@ -31,7 +31,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2; import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -42,11 +41,9 @@ import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -55,13 +52,6 @@ import java.util.Set;
*/ */
public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
private static enum HostType {
PRIVATE_IP,
PUBLIC_IP,
PRIVATE_DNS,
PUBLIC_DNS
}
private final TransportService transportService; private final TransportService transportService;
private final AmazonEC2 client; private final AmazonEC2 client;
@ -76,7 +66,7 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
private final Set<String> availabilityZones; private final Set<String> availabilityZones;
private final HostType hostType; private final DISCOVERY_EC2.HostType hostType;
private final DiscoNodesCache discoNodes; private final DiscoNodesCache discoNodes;
@ -87,24 +77,17 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
this.client = awsEc2Service.client(); this.client = awsEc2Service.client();
this.version = version; this.version = version;
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip") this.hostType = DISCOVERY_EC2.HOST_TYPE_SETTING.get(settings);
.toUpperCase(Locale.ROOT)); this.discoNodes = new DiscoNodesCache(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.get(settings));
this.discoNodes = new DiscoNodesCache(this.settings.getAsTime(DISCOVERY_EC2.NODE_CACHE_TIME, this.bindAnyGroup = DISCOVERY_EC2.ANY_GROUP_SETTING.get(settings);
TimeValue.timeValueMillis(10_000L)));
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
this.groups = new HashSet<>(); this.groups = new HashSet<>();
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS))); this.groups.addAll(DISCOVERY_EC2.GROUPS_SETTING.get(settings));
this.tags = settings.getByPrefix(DISCOVERY_EC2.TAG_PREFIX).getAsMap(); this.tags = DISCOVERY_EC2.TAG_SETTING.get(settings).getAsMap();
Set<String> availabilityZones = new HashSet<>(); this.availabilityZones = new HashSet<>();
availabilityZones.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.AVAILABILITY_ZONES))); availabilityZones.addAll(DISCOVERY_EC2.AVAILABILITY_ZONES_SETTING.get(settings));
if (settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES) != null) {
availabilityZones.addAll(Strings.commaDelimitedListToSet(settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES)));
}
this.availabilityZones = availabilityZones;
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("using host_type [{}], tags [{}], groups [{}] with any_group [{}], availability_zones [{}]", hostType, tags, groups, bindAnyGroup, availabilityZones); logger.debug("using host_type [{}], tags [{}], groups [{}] with any_group [{}], availability_zones [{}]", hostType, tags, groups, bindAnyGroup, availabilityZones);

View File

@ -19,11 +19,6 @@
package org.elasticsearch.plugin.discovery.ec2; package org.elasticsearch.plugin.discovery.ec2;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import org.elasticsearch.SpecialPermission; import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl; import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
@ -32,6 +27,7 @@ import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryModule;
@ -39,6 +35,11 @@ import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.ec2.Ec2Discovery; import org.elasticsearch.discovery.ec2.Ec2Discovery;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
/** /**
* *
*/ */
@ -104,12 +105,51 @@ public class Ec2DiscoveryPlugin extends Plugin {
} }
public void onModule(SettingsModule settingsModule) { public void onModule(SettingsModule settingsModule) {
// Register global cloud aws settings: cloud.aws (might have been registered in ec2 plugin)
registerSettingIfMissing(settingsModule, AwsEc2Service.KEY_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.SECRET_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.PROTOCOL_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.PROXY_HOST_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.PROXY_PORT_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.PROXY_USERNAME_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.PROXY_PASSWORD_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.SIGNER_SETTING);
registerSettingIfMissing(settingsModule, AwsEc2Service.REGION_SETTING);
// Register EC2 specific settings: cloud.aws.ec2
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.KEY_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.SECRET_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.REGION_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING);
// Register EC2 discovery settings: discovery.ec2
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.HOST_TYPE_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.ANY_GROUP_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.GROUPS_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.AVAILABILITY_ZONES_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.NODE_CACHE_TIME_SETTING);
// Filter global settings // Filter global settings
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.KEY); settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.SECRET); settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.PROXY_PASSWORD); settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.PROXY_PASSWORD_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.KEY); settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.SECRET); settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD); settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.getKey());
}
/**
* We manage potential duplicates between s3 and ec2 plugins (cloud.aws.xxx)
*/
private void registerSettingIfMissing(SettingsModule settingsModule, Setting<?> setting) {
if (settingsModule.exists(setting) == false) {
settingsModule.registerSetting(setting);
}
} }
} }

View File

@ -20,11 +20,24 @@
package org.elasticsearch.cloud.aws; package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
public class AWSSignersTests extends ESTestCase { public class AWSSignersTests extends ESTestCase {
/**
* Starts Ec2DiscoveryPlugin. It's a workaround when you run test from IntelliJ. Otherwise it generates
* java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessDeclaredMembers")
*/
@BeforeClass
public static void instantiatePlugin() {
new Ec2DiscoveryPlugin(Settings.EMPTY);
}
public void testSigners() { public void testSigners() {
assertThat(signerTester(null), is(false)); assertThat(signerTester(null), is(false));
assertThat(signerTester("QueryStringSignerType"), is(true)); assertThat(signerTester("QueryStringSignerType"), is(true));

View File

@ -25,9 +25,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin; import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty; import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import java.util.Collection;
/** /**
* Base class for AWS tests that require credentials. * Base class for AWS tests that require credentials.
* <p> * <p>
@ -42,7 +45,6 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
Settings.Builder settings = Settings.builder() Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.extendArray("plugin.types", Ec2DiscoveryPlugin.class.getName())
.put("cloud.aws.test.random", randomInt()) .put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1) .put("cloud.aws.test.write_failures", 0.1)
.put("cloud.aws.test.read_failures", 0.1); .put("cloud.aws.test.read_failures", 0.1);
@ -52,11 +54,16 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
if (Strings.hasText(System.getProperty("tests.config"))) { if (Strings.hasText(System.getProperty("tests.config"))) {
settings.loadFromPath(PathUtils.get(System.getProperty("tests.config"))); settings.loadFromPath(PathUtils.get(System.getProperty("tests.config")));
} else { } else {
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml"); throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
} }
} catch (SettingsException exception) { } catch (SettingsException exception) {
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception); throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
} }
return settings.build(); return settings.build();
} }
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(Ec2DiscoveryPlugin.class);
}
} }

View File

@ -19,11 +19,14 @@
package org.elasticsearch.discovery.ec2; package org.elasticsearch.discovery.ec2;
import com.amazonaws.Protocol;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.Ec2Module; import org.elasticsearch.cloud.aws.Ec2Module;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
public class Ec2DiscoverySettingsTests extends ESTestCase { public class Ec2DiscoverySettingsTests extends ESTestCase {
@ -41,4 +44,71 @@ public class Ec2DiscoverySettingsTests extends ESTestCase {
assertThat(discoveryReady, is(false)); assertThat(discoveryReady, is(false));
} }
private static final Settings AWS = Settings.builder()
.put(AwsEc2Service.KEY_SETTING.getKey(), "global-key")
.put(AwsEc2Service.SECRET_SETTING.getKey(), "global-secret")
.put(AwsEc2Service.PROTOCOL_SETTING.getKey(), "https")
.put(AwsEc2Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
.put(AwsEc2Service.PROXY_PORT_SETTING.getKey(), 10000)
.put(AwsEc2Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
.put(AwsEc2Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
.put(AwsEc2Service.SIGNER_SETTING.getKey(), "global-signer")
.put(AwsEc2Service.REGION_SETTING.getKey(), "global-region")
.build();
private static final Settings EC2 = Settings.builder()
.put(AwsEc2Service.CLOUD_EC2.KEY_SETTING.getKey(), "ec2-key")
.put(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.getKey(), "ec2-secret")
.put(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.getKey(), "http")
.put(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.getKey(), "ec2-proxy-host")
.put(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.getKey(), 20000)
.put(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.getKey(), "ec2-proxy-username")
.put(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.getKey(), "ec2-proxy-password")
.put(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.getKey(), "ec2-signer")
.put(AwsEc2Service.CLOUD_EC2.REGION_SETTING.getKey(), "ec2-region")
.put(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.getKey(), "ec2-endpoint")
.build();
/**
* We test when only cloud.aws settings are set
*/
public void testRepositorySettingsGlobalOnly() {
Settings nodeSettings = buildSettings(AWS);
assertThat(AwsEc2Service.CLOUD_EC2.KEY_SETTING.get(nodeSettings), is("global-key"));
assertThat(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.get(nodeSettings), is("global-secret"));
assertThat(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.get(nodeSettings), is(Protocol.HTTPS));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(AwsEc2Service.CLOUD_EC2.REGION_SETTING.get(nodeSettings), is("global-region"));
assertThat(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.get(nodeSettings), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by cloud.aws.ec2 settings
*/
public void testRepositorySettingsGlobalOverloadedByEC2() {
Settings nodeSettings = buildSettings(AWS, EC2);
assertThat(AwsEc2Service.CLOUD_EC2.KEY_SETTING.get(nodeSettings), is("ec2-key"));
assertThat(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.get(nodeSettings), is("ec2-secret"));
assertThat(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.get(nodeSettings), is(Protocol.HTTP));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.get(nodeSettings), is("ec2-proxy-host"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.get(nodeSettings), is("ec2-proxy-username"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.get(nodeSettings), is("ec2-proxy-password"));
assertThat(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.get(nodeSettings), is("ec2-signer"));
assertThat(AwsEc2Service.CLOUD_EC2.REGION_SETTING.get(nodeSettings), is("ec2-region"));
assertThat(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.get(nodeSettings), is("ec2-endpoint"));
}
private Settings buildSettings(Settings... global) {
Settings.Builder builder = Settings.builder();
for (Settings settings : global) {
builder.put(settings);
}
return builder.build();
}
} }

View File

@ -20,7 +20,6 @@
package org.elasticsearch.discovery.ec2; package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.model.Tag; import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2; import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
@ -95,7 +94,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPrivateIp() throws InterruptedException { public void testPrivateIp() throws InterruptedException {
int nodes = randomInt(10); int nodes = randomInt(10);
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "private_ip") .put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "private_ip")
.build(); .build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes); List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes)); assertThat(discoveryNodes, hasSize(nodes));
@ -111,7 +110,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPublicIp() throws InterruptedException { public void testPublicIp() throws InterruptedException {
int nodes = randomInt(10); int nodes = randomInt(10);
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "public_ip") .put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "public_ip")
.build(); .build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes); List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes)); assertThat(discoveryNodes, hasSize(nodes));
@ -127,7 +126,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPrivateDns() throws InterruptedException { public void testPrivateDns() throws InterruptedException {
int nodes = randomInt(10); int nodes = randomInt(10);
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "private_dns") .put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "private_dns")
.build(); .build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes); List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes)); assertThat(discoveryNodes, hasSize(nodes));
@ -145,7 +144,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPublicDns() throws InterruptedException { public void testPublicDns() throws InterruptedException {
int nodes = randomInt(10); int nodes = randomInt(10);
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "public_dns") .put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "public_dns")
.build(); .build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes); List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes)); assertThat(discoveryNodes, hasSize(nodes));
@ -162,7 +161,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testInvalidHostType() throws InterruptedException { public void testInvalidHostType() throws InterruptedException {
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "does_not_exist") .put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "does_not_exist")
.build(); .build();
try { try {
buildDynamicNodes(nodeSettings, 1); buildDynamicNodes(nodeSettings, 1);
@ -175,7 +174,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testFilterByTags() throws InterruptedException { public void testFilterByTags() throws InterruptedException {
int nodes = randomIntBetween(5, 10); int nodes = randomIntBetween(5, 10);
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod") .put(DISCOVERY_EC2.TAG_SETTING.getKey() + "stage", "prod")
.build(); .build();
int prodInstances = 0; int prodInstances = 0;
@ -200,7 +199,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testFilterByMultipleTags() throws InterruptedException { public void testFilterByMultipleTags() throws InterruptedException {
int nodes = randomIntBetween(5, 10); int nodes = randomIntBetween(5, 10);
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.putArray(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod", "preprod") .putArray(DISCOVERY_EC2.TAG_SETTING.getKey() + "stage", "prod", "preprod")
.build(); .build();
int prodInstances = 0; int prodInstances = 0;
@ -252,7 +251,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testGetNodeListCached() throws Exception { public void testGetNodeListCached() throws Exception {
Settings.Builder builder = Settings.settingsBuilder() Settings.Builder builder = Settings.settingsBuilder()
.put(DISCOVERY_EC2.NODE_CACHE_TIME, "500ms"); .put(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.getKey(), "500ms");
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null); AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) { DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) {
@Override @Override

View File

@ -23,7 +23,6 @@ package org.elasticsearch.discovery.ec2;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cloud.aws.AbstractAwsTestCase; import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -39,8 +38,6 @@ import static org.hamcrest.CoreMatchers.is;
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase { public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
public void testMinimumMasterNodesStart() { public void testMinimumMasterNodesStart() {
Settings nodeSettings = settingsBuilder() Settings nodeSettings = settingsBuilder()
.put("plugin.types", Ec2DiscoveryPlugin.class.getName())
.put("cloud.enabled", true)
.put("discovery.type", "ec2") .put("discovery.type", "ec2")
.build(); .build();
internalCluster().startNode(nodeSettings); internalCluster().startNode(nodeSettings);

View File

@ -0,0 +1,48 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' at
# the root directory of all plugins.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# <arbitrary name1>.jar <-- classes, resources, dependencies
# <arbitrary nameN>.jar <-- any number of jars
# plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=2.0
# elasticsearch.version=2.0
# java.version=1.7
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=The S3 repository plugin adds S3 repositories.
#
# 'version': plugin's version
version=3.0.0-SNAPSHOT
#
# 'name': the plugin name
name=repository-s3
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin
#
# 'java.version' version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version' version of elasticsearch compiled against
elasticsearch.version=3.0.0-SNAPSHOT
#
### deprecated elements for jvm plugins :
#
# 'isolated': true if the plugin should have its own classloader.
# passing false is deprecated, and only intended to support plugins
# that have hard dependencies against each other. If this is
# not specified, then the plugin is isolated by default.
isolated=true
#

View File

@ -19,59 +19,132 @@
package org.elasticsearch.cloud.aws; package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import java.util.Locale;
import java.util.function.Function;
/** /**
* *
*/ */
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> { public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
final class CLOUD_AWS { // Global AWS settings (shared between discovery-ec2 and repository-s3)
public static final String KEY = "cloud.aws.access_key"; // Each setting starting with `cloud.aws` also exists in discovery-ec2 project. Don't forget to update
public static final String SECRET = "cloud.aws.secret_key"; // the code there if you change anything here.
public static final String PROTOCOL = "cloud.aws.protocol"; /**
public static final String PROXY_HOST = "cloud.aws.proxy.host"; * cloud.aws.access_key: AWS Access key. Shared with discovery-ec2 plugin
public static final String PROXY_PORT = "cloud.aws.proxy.port"; */
public static final String PROXY_USERNAME = "cloud.aws.proxy.username"; Setting<String> KEY_SETTING = Setting.simpleString("cloud.aws.access_key", false, Setting.Scope.CLUSTER);
public static final String PROXY_PASSWORD = "cloud.aws.proxy.password"; /**
public static final String SIGNER = "cloud.aws.signer"; * cloud.aws.secret_key: AWS Secret key. Shared with discovery-ec2 plugin
public static final String REGION = "cloud.aws.region"; */
Setting<String> SECRET_SETTING = Setting.simpleString("cloud.aws.secret_key", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.protocol: Protocol for AWS API: http or https. Defaults to https. Shared with discovery-ec2 plugin
*/
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)),
false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.host: In case of proxy, define its hostname/IP. Shared with discovery-ec2 plugin
*/
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("cloud.aws.proxy.host", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.port: In case of proxy, define its port. Defaults to 80. Shared with discovery-ec2 plugin
*/
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("cloud.aws.proxy.port", 80, 0, 1<<16, false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.username: In case of proxy with auth, define the username. Shared with discovery-ec2 plugin
*/
Setting<String> PROXY_USERNAME_SETTING = Setting.simpleString("cloud.aws.proxy.username", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.proxy.password: In case of proxy with auth, define the password. Shared with discovery-ec2 plugin
*/
Setting<String> PROXY_PASSWORD_SETTING = Setting.simpleString("cloud.aws.proxy.password", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.signer: If you are using an old AWS API version, you can define a Signer. Shared with discovery-ec2 plugin
*/
Setting<String> SIGNER_SETTING = Setting.simpleString("cloud.aws.signer", false, Setting.Scope.CLUSTER);
/**
* cloud.aws.region: Region. Shared with discovery-ec2 plugin
*/
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
/**
* Defines specific s3 settings starting with cloud.aws.s3.
*/
interface CLOUD_S3 {
/**
* cloud.aws.s3.access_key: AWS Access key specific for S3 API calls. Defaults to cloud.aws.access_key.
* @see AwsS3Service#KEY_SETTING
*/
Setting<String> KEY_SETTING =
new Setting<>("cloud.aws.s3.access_key", AwsS3Service.KEY_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.secret_key: AWS Secret key specific for S3 API calls. Defaults to cloud.aws.secret_key.
* @see AwsS3Service#SECRET_SETTING
*/
Setting<String> SECRET_SETTING =
new Setting<>("cloud.aws.s3.secret_key", AwsS3Service.SECRET_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.protocol: Protocol for AWS API specific for S3 API calls: http or https. Defaults to cloud.aws.protocol.
* @see AwsS3Service#PROTOCOL_SETTING
*/
Setting<Protocol> PROTOCOL_SETTING =
new Setting<>("cloud.aws.s3.protocol", AwsS3Service.PROTOCOL_SETTING, s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false,
Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.proxy.host: In case of proxy, define its hostname/IP specific for S3 API calls. Defaults to cloud.aws.proxy.host.
* @see AwsS3Service#PROXY_HOST_SETTING
*/
Setting<String> PROXY_HOST_SETTING =
new Setting<>("cloud.aws.s3.proxy.host", AwsS3Service.PROXY_HOST_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.proxy.port: In case of proxy, define its port specific for S3 API calls. Defaults to cloud.aws.proxy.port.
* @see AwsS3Service#PROXY_PORT_SETTING
*/
Setting<Integer> PROXY_PORT_SETTING =
new Setting<>("cloud.aws.s3.proxy.port", AwsS3Service.PROXY_PORT_SETTING,
s -> Setting.parseInt(s, 0, 1<<16, "cloud.aws.s3.proxy.port"), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.proxy.username: In case of proxy with auth, define the username specific for S3 API calls.
* Defaults to cloud.aws.proxy.username.
* @see AwsS3Service#PROXY_USERNAME_SETTING
*/
Setting<String> PROXY_USERNAME_SETTING =
new Setting<>("cloud.aws.s3.proxy.username", AwsS3Service.PROXY_USERNAME_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.proxy.password: In case of proxy with auth, define the password specific for S3 API calls.
* Defaults to cloud.aws.proxy.password.
* @see AwsS3Service#PROXY_PASSWORD_SETTING
*/
Setting<String> PROXY_PASSWORD_SETTING =
new Setting<>("cloud.aws.s3.proxy.password", AwsS3Service.PROXY_PASSWORD_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.signer: If you are using an old AWS API version, you can define a Signer. Specific for S3 API calls.
* Defaults to cloud.aws.signer.
* @see AwsS3Service#SIGNER_SETTING
*/
Setting<String> SIGNER_SETTING =
new Setting<>("cloud.aws.s3.signer", AwsS3Service.SIGNER_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.region: Region specific for S3 API calls. Defaults to cloud.aws.region.
* @see AwsS3Service#REGION_SETTING
*/
Setting<String> REGION_SETTING =
new Setting<>("cloud.aws.s3.region", AwsS3Service.REGION_SETTING, s -> s.toLowerCase(Locale.ROOT), false,
Setting.Scope.CLUSTER);
/**
* cloud.aws.s3.endpoint: Endpoint. If not set, endpoint will be guessed based on region setting.
*/
Setting<String> ENDPOINT_SETTING =
Setting.simpleString("cloud.aws.s3.endpoint", false, Setting.Scope.CLUSTER);
} }
final class CLOUD_S3 { AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries);
public static final String KEY = "cloud.aws.s3.access_key";
public static final String SECRET = "cloud.aws.s3.secret_key";
public static final String PROTOCOL = "cloud.aws.s3.protocol";
public static final String PROXY_HOST = "cloud.aws.s3.proxy.host";
public static final String PROXY_PORT = "cloud.aws.s3.proxy.port";
public static final String PROXY_USERNAME = "cloud.aws.s3.proxy.username";
public static final String PROXY_PASSWORD = "cloud.aws.s3.proxy.password";
public static final String SIGNER = "cloud.aws.s3.signer";
public static final String ENDPOINT = "cloud.aws.s3.endpoint";
}
final class REPOSITORY_S3 {
public static final String BUCKET = "repositories.s3.bucket";
public static final String ENDPOINT = "repositories.s3.endpoint";
public static final String PROTOCOL = "repositories.s3.protocol";
public static final String REGION = "repositories.s3.region";
public static final String SERVER_SIDE_ENCRYPTION = "repositories.s3.server_side_encryption";
public static final String BUFFER_SIZE = "repositories.s3.buffer_size";
public static final String MAX_RETRIES = "repositories.s3.max_retries";
public static final String CHUNK_SIZE = "repositories.s3.chunk_size";
public static final String COMPRESS = "repositories.s3.compress";
public static final String STORAGE_CLASS = "repositories.s3.storage_class";
public static final String CANNED_ACL = "repositories.s3.canned_acl";
public static final String BASE_PATH = "repositories.s3.base_path";
}
AmazonS3 client();
AmazonS3 client(String endpoint, String protocol, String region, String account, String key);
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
} }

View File

@ -31,16 +31,14 @@ import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.internal.StaticCredentialsProvider; import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale;
import java.util.Map; import java.util.Map;
/** /**
@ -51,7 +49,7 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
/** /**
* (acceskey, endpoint) -&gt; client * (acceskey, endpoint) -&gt; client
*/ */
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>(); private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<>();
@Inject @Inject
public InternalAwsS3Service(Settings settings) { public InternalAwsS3Service(Settings settings) {
@ -59,36 +57,23 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
} }
@Override @Override
public synchronized AmazonS3 client() { public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
String endpoint = getDefaultEndpoint(); if (Strings.isNullOrEmpty(endpoint)) {
String account = settings.get(CLOUD_S3.KEY, settings.get(CLOUD_AWS.KEY)); // We need to set the endpoint based on the region
String key = settings.get(CLOUD_S3.SECRET, settings.get(CLOUD_AWS.SECRET)); if (region != null) {
return getClient(endpoint, null, account, key, null);
}
@Override
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return client(endpoint, protocol, region, account, key, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
if (region != null && endpoint == null) {
endpoint = getEndpoint(region); endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint); logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
} else if (endpoint == null) { } else {
// No region has been set so we will use the default endpoint
endpoint = getDefaultEndpoint(); endpoint = getDefaultEndpoint();
} }
if (account == null || key == null) {
account = settings.get(CLOUD_S3.KEY, settings.get(CLOUD_AWS.KEY));
key = settings.get(CLOUD_S3.SECRET, settings.get(CLOUD_AWS.SECRET));
} }
return getClient(endpoint, protocol, account, key, maxRetries); return getClient(endpoint, protocol, account, key, maxRetries);
} }
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) { private synchronized AmazonS3 getClient(String endpoint, Protocol protocol, String account, String key, Integer maxRetries) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account); Tuple<String, String> clientDescriptor = new Tuple<>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor); AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) { if (client != null) {
return client; return client;
@ -98,32 +83,13 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
// the response metadata cache is only there for diagnostics purposes, // the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation. // but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0); clientConfiguration.setResponseMetadataCacheSize(0);
if (protocol == null) { clientConfiguration.setProtocol(protocol);
protocol = settings.get(CLOUD_AWS.PROTOCOL, "https").toLowerCase(Locale.ROOT);
protocol = settings.get(CLOUD_S3.PROTOCOL, protocol).toLowerCase(Locale.ROOT);
}
if ("http".equals(protocol)) { String proxyHost = CLOUD_S3.PROXY_HOST_SETTING.get(settings);
clientConfiguration.setProtocol(Protocol.HTTP); if (Strings.hasText(proxyHost)) {
} else if ("https".equals(protocol)) { Integer proxyPort = CLOUD_S3.PROXY_PORT_SETTING.get(settings);
clientConfiguration.setProtocol(Protocol.HTTPS); String proxyUsername = CLOUD_S3.PROXY_USERNAME_SETTING.get(settings);
} else { String proxyPassword = CLOUD_S3.PROXY_PASSWORD_SETTING.get(settings);
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String proxyHost = settings.get(CLOUD_AWS.PROXY_HOST);
proxyHost = settings.get(CLOUD_S3.PROXY_HOST, proxyHost);
if (proxyHost != null) {
String portString = settings.get(CLOUD_AWS.PROXY_PORT, "80");
portString = settings.get(CLOUD_S3.PROXY_PORT, portString);
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
String proxyUsername = settings.get(CLOUD_S3.PROXY_USERNAME, settings.get(CLOUD_AWS.PROXY_USERNAME));
String proxyPassword = settings.get(CLOUD_S3.PROXY_PASSWORD, settings.get(CLOUD_AWS.PROXY_PASSWORD));
clientConfiguration clientConfiguration
.withProxyHost(proxyHost) .withProxyHost(proxyHost)
@ -138,8 +104,8 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
} }
// #155: we might have 3rd party users using older S3 API version // #155: we might have 3rd party users using older S3 API version
String awsSigner = settings.get(CLOUD_S3.SIGNER, settings.get(CLOUD_AWS.SIGNER)); String awsSigner = CLOUD_S3.SIGNER_SETTING.get(settings);
if (awsSigner != null) { if (Strings.hasText(awsSigner)) {
logger.debug("using AWS API signer [{}]", awsSigner); logger.debug("using AWS API signer [{}]", awsSigner);
AwsSigner.configureSigner(awsSigner, clientConfiguration, endpoint); AwsSigner.configureSigner(awsSigner, clientConfiguration, endpoint);
} }
@ -168,11 +134,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
private String getDefaultEndpoint() { private String getDefaultEndpoint() {
String endpoint = null; String endpoint = null;
if (settings.get(CLOUD_S3.ENDPOINT) != null) { if (CLOUD_S3.ENDPOINT_SETTING.exists(settings)) {
endpoint = settings.get(CLOUD_S3.ENDPOINT); endpoint = CLOUD_S3.ENDPOINT_SETTING.get(settings);
logger.debug("using explicit s3 endpoint [{}]", endpoint); logger.debug("using explicit s3 endpoint [{}]", endpoint);
} else if (settings.get(CLOUD_AWS.REGION) != null) { } else if (CLOUD_S3.REGION_SETTING.exists(settings)) {
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT); String region = CLOUD_S3.REGION_SETTING.get(settings);
endpoint = getEndpoint(region); endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint); logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.S3Module; import org.elasticsearch.cloud.aws.S3Module;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
@ -88,14 +89,79 @@ public class S3RepositoryPlugin extends Plugin {
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class); repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class);
} }
public void onModule(SettingsModule module) { public void onModule(SettingsModule settingsModule) {
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.KEY); // Register global cloud aws settings: cloud.aws (might have been registered in ec2 plugin)
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.SECRET); registerSettingIfMissing(settingsModule, AwsS3Service.KEY_SETTING);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.PROXY_PASSWORD); registerSettingIfMissing(settingsModule, AwsS3Service.SECRET_SETTING);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.KEY); registerSettingIfMissing(settingsModule, AwsS3Service.PROTOCOL_SETTING);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.SECRET); registerSettingIfMissing(settingsModule, AwsS3Service.PROXY_HOST_SETTING);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.PROXY_PASSWORD); registerSettingIfMissing(settingsModule, AwsS3Service.PROXY_PORT_SETTING);
module.registerSettingsFilter("access_key"); // WTF is this? registerSettingIfMissing(settingsModule, AwsS3Service.PROXY_USERNAME_SETTING);
module.registerSettingsFilter("secret_key"); // WTF is this? registerSettingIfMissing(settingsModule, AwsS3Service.PROXY_PASSWORD_SETTING);
registerSettingIfMissing(settingsModule, AwsS3Service.SIGNER_SETTING);
registerSettingIfMissing(settingsModule, AwsS3Service.REGION_SETTING);
// Register S3 specific settings: cloud.aws.s3
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.KEY_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.SECRET_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.SIGNER_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.REGION_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING);
// Register S3 repositories settings: repositories.s3
settingsModule.registerSetting(S3Repository.Repositories.KEY_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.SECRET_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BUCKET_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.REGION_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.ENDPOINT_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.PROTOCOL_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BUFFER_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.MAX_RETRIES_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.CHUNK_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.COMPRESS_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.STORAGE_CLASS_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.CANNED_ACL_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BASE_PATH_SETTING);
// Register S3 single repository settings
settingsModule.registerSetting(S3Repository.Repository.KEY_SETTING);
settingsModule.registerSetting(S3Repository.Repository.SECRET_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BUCKET_SETTING);
settingsModule.registerSetting(S3Repository.Repository.ENDPOINT_SETTING);
settingsModule.registerSetting(S3Repository.Repository.PROTOCOL_SETTING);
settingsModule.registerSetting(S3Repository.Repository.REGION_SETTING);
settingsModule.registerSetting(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BUFFER_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repository.MAX_RETRIES_SETTING);
settingsModule.registerSetting(S3Repository.Repository.CHUNK_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repository.COMPRESS_SETTING);
settingsModule.registerSetting(S3Repository.Repository.STORAGE_CLASS_SETTING);
settingsModule.registerSetting(S3Repository.Repository.CANNED_ACL_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BASE_PATH_SETTING);
// Filter global settings
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.PROXY_PASSWORD_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey());
settingsModule.registerSettingsFilter(S3Repository.Repository.KEY_SETTING.getKey());
settingsModule.registerSettingsFilter(S3Repository.Repository.SECRET_SETTING.getKey());
}
/**
* We manage potential duplicates between s3 and ec2 plugins (cloud.aws.xxx)
*/
private void registerSettingIfMissing(SettingsModule settingsModule, Setting<?> setting) {
if (settingsModule.exists(setting) == false) {
settingsModule.registerSetting(setting);
}
} }
} }

View File

@ -19,14 +19,15 @@
package org.elasticsearch.repositories.s3; package org.elasticsearch.repositories.s3;
import com.amazonaws.Protocol;
import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_AWS; import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_S3;
import org.elasticsearch.cloud.aws.AwsS3Service.REPOSITORY_S3;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore; import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardRepository;
@ -37,6 +38,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException; import java.io.IOException;
import java.util.Locale; import java.util.Locale;
import java.util.function.Function;
/** /**
* Shared file system implementation of the BlobStoreRepository * Shared file system implementation of the BlobStoreRepository
@ -55,6 +57,157 @@ public class S3Repository extends BlobStoreRepository {
public final static String TYPE = "s3"; public final static String TYPE = "s3";
/**
* Global S3 repositories settings. Starting with: repositories.s3
*/
public interface Repositories {
/**
* repositories.s3.access_key: AWS Access key specific for all S3 Repositories API calls. Defaults to cloud.aws.s3.access_key.
* @see CLOUD_S3#KEY_SETTING
*/
Setting<String> KEY_SETTING = new Setting<>("repositories.s3.access_key", CLOUD_S3.KEY_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
/**
* repositories.s3.secret_key: AWS Secret key specific for all S3 Repositories API calls. Defaults to cloud.aws.s3.secret_key.
* @see CLOUD_S3#SECRET_SETTING
*/
Setting<String> SECRET_SETTING = new Setting<>("repositories.s3.secret_key", CLOUD_S3.SECRET_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
/**
* repositories.s3.region: Region specific for all S3 Repositories API calls. Defaults to cloud.aws.s3.region.
* @see CLOUD_S3#REGION_SETTING
*/
Setting<String> REGION_SETTING = new Setting<>("repositories.s3.region", CLOUD_S3.REGION_SETTING, s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
/**
* repositories.s3.endpoint: Endpoint specific for all S3 Repositories API calls. Defaults to cloud.aws.s3.endpoint.
* @see CLOUD_S3#ENDPOINT_SETTING
*/
Setting<String> ENDPOINT_SETTING = new Setting<>("repositories.s3.endpoint", CLOUD_S3.ENDPOINT_SETTING, s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
/**
* repositories.s3.protocol: Protocol specific for all S3 Repositories API calls. Defaults to cloud.aws.s3.protocol.
* @see CLOUD_S3#PROTOCOL_SETTING
*/
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("repositories.s3.protocol", CLOUD_S3.PROTOCOL_SETTING, s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
/**
* repositories.s3.bucket: The name of the bucket to be used for snapshots.
*/
Setting<String> BUCKET_SETTING = Setting.simpleString("repositories.s3.bucket", false, Setting.Scope.CLUSTER);
/**
* repositories.s3.server_side_encryption: When set to true files are encrypted on server side using AES256 algorithm.
* Defaults to false.
*/
Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("repositories.s3.server_side_encryption", false, false, Setting.Scope.CLUSTER);
/**
* repositories.s3.buffer_size: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
* to upload each part in its own request. Note that setting a buffer size lower than 5mb is not allowed since it will prevents the
* use of the Multipart API and may result in upload errors. Defaults to 5mb.
*/
Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("repositories.s3.buffer_size", S3BlobStore.MIN_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
/**
* repositories.s3.max_retries: Number of retries in case of S3 errors. Defaults to 3.
*/
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("repositories.s3.max_retries", 3, false, Setting.Scope.CLUSTER);
/**
* repositories.s3.chunk_size: Big files can be broken down into chunks during snapshotting if needed. Defaults to 100m.
*/
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB), false, Setting.Scope.CLUSTER);
/**
* repositories.s3.compress: When set to true metadata files are stored in compressed format. This setting doesnt affect index
* files that are already compressed by default. Defaults to false.
*/
Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("repositories.s3.compress", false, false, Setting.Scope.CLUSTER);
/**
* repositories.s3.storage_class: Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia. Defaults to standard.
*/
Setting<String> STORAGE_CLASS_SETTING = Setting.simpleString("repositories.s3.storage_class", false, Setting.Scope.CLUSTER);
/**
* repositories.s3.canned_acl: The S3 repository supports all S3 canned ACLs : private, public-read, public-read-write,
* authenticated-read, log-delivery-write, bucket-owner-read, bucket-owner-full-control. Defaults to private.
*/
Setting<String> CANNED_ACL_SETTING = Setting.simpleString("repositories.s3.canned_acl", false, Setting.Scope.CLUSTER);
/**
* repositories.s3.base_path: Specifies the path within bucket to repository data. Defaults to root directory.
*/
Setting<String> BASE_PATH_SETTING = Setting.simpleString("repositories.s3.base_path", false, Setting.Scope.CLUSTER);
}
/**
* Per S3 repository specific settings. Same settings as Repositories settings but without the repositories.s3 prefix.
* If undefined, they use the repositories.s3.xxx equivalent setting.
*/
public interface Repository {
/**
* access_key
* @see Repositories#KEY_SETTING
*/
Setting<String> KEY_SETTING = Setting.simpleString("access_key", false, Setting.Scope.CLUSTER);
/**
* secret_key
* @see Repositories#SECRET_SETTING
*/
Setting<String> SECRET_SETTING = Setting.simpleString("secret_key", false, Setting.Scope.CLUSTER);
/**
* bucket
* @see Repositories#BUCKET_SETTING
*/
Setting<String> BUCKET_SETTING = Setting.simpleString("bucket", false, Setting.Scope.CLUSTER);
/**
* endpoint
* @see Repositories#ENDPOINT_SETTING
*/
Setting<String> ENDPOINT_SETTING = Setting.simpleString("endpoint", false, Setting.Scope.CLUSTER);
/**
* protocol
* @see Repositories#PROTOCOL_SETTING
*/
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
/**
* region
* @see Repositories#REGION_SETTING
*/
Setting<String> REGION_SETTING = new Setting<>("region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
/**
* server_side_encryption
* @see Repositories#SERVER_SIDE_ENCRYPTION_SETTING
*/
Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("server_side_encryption", false, false, Setting.Scope.CLUSTER);
/**
* buffer_size
* @see Repositories#BUFFER_SIZE_SETTING
*/
Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", S3BlobStore.MIN_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
/**
* max_retries
* @see Repositories#MAX_RETRIES_SETTING
*/
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("max_retries", 3, false, Setting.Scope.CLUSTER);
/**
* chunk_size
* @see Repositories#CHUNK_SIZE_SETTING
*/
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("chunk_size", "-1", false, Setting.Scope.CLUSTER);
/**
* compress
* @see Repositories#COMPRESS_SETTING
*/
Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, false, Setting.Scope.CLUSTER);
/**
* storage_class
* @see Repositories#STORAGE_CLASS_SETTING
*/
Setting<String> STORAGE_CLASS_SETTING = Setting.simpleString("storage_class", false, Setting.Scope.CLUSTER);
/**
* canned_acl
* @see Repositories#CANNED_ACL_SETTING
*/
Setting<String> CANNED_ACL_SETTING = Setting.simpleString("canned_acl", false, Setting.Scope.CLUSTER);
/**
* base_path
* @see Repositories#BASE_PATH_SETTING
*/
Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path", false, Setting.Scope.CLUSTER);
}
private final S3BlobStore blobStore; private final S3BlobStore blobStore;
private final BlobPath basePath; private final BlobPath basePath;
@ -75,62 +228,40 @@ public class S3Repository extends BlobStoreRepository {
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException { public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository); super(name.getName(), repositorySettings, indexShardRepository);
String bucket = repositorySettings.settings().get("bucket", settings.get(REPOSITORY_S3.BUCKET)); String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
if (bucket == null) { if (bucket == null) {
throw new RepositoryException(name.name(), "No bucket defined for s3 gateway"); throw new RepositoryException(name.name(), "No bucket defined for s3 gateway");
} }
String endpoint = repositorySettings.settings().get("endpoint", settings.get(REPOSITORY_S3.ENDPOINT)); String endpoint = getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING);
String protocol = repositorySettings.settings().get("protocol", settings.get(REPOSITORY_S3.PROTOCOL)); Protocol protocol = getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING);
String region = getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING);
String region = repositorySettings.settings().get("region", settings.get(REPOSITORY_S3.REGION)); // If no region is defined either in region, repositories.s3.region, cloud.aws.s3.region or cloud.aws.region
if (region == null) { // we fallback to Default bucket - null
// InternalBucket setting is not set - use global region setting if (Strings.isEmpty(region)) {
String regionSetting = settings.get(CLOUD_AWS.REGION);
if (regionSetting != null) {
regionSetting = regionSetting.toLowerCase(Locale.ENGLISH);
if ("us-east".equals(regionSetting) || "us-east-1".equals(regionSetting)) {
// Default bucket - setting region to null
region = null; region = null;
} else if ("us-west".equals(regionSetting) || "us-west-1".equals(regionSetting)) {
region = "us-west-1";
} else if ("us-west-2".equals(regionSetting)) {
region = "us-west-2";
} else if ("ap-southeast".equals(regionSetting) || "ap-southeast-1".equals(regionSetting)) {
region = "ap-southeast-1";
} else if ("ap-southeast-2".equals(regionSetting)) {
region = "ap-southeast-2";
} else if ("ap-northeast".equals(regionSetting) || "ap-northeast-1".equals(regionSetting)) {
region = "ap-northeast-1";
} else if ("eu-west".equals(regionSetting) || "eu-west-1".equals(regionSetting)) {
region = "eu-west-1";
} else if ("eu-central".equals(regionSetting) || "eu-central-1".equals(regionSetting)) {
region = "eu-central-1";
} else if ("sa-east".equals(regionSetting) || "sa-east-1".equals(regionSetting)) {
region = "sa-east-1";
} else if ("cn-north".equals(regionSetting) || "cn-north-1".equals(regionSetting)) {
region = "cn-north-1";
}
}
} }
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", settings.getAsBoolean(REPOSITORY_S3.SERVER_SIDE_ENCRYPTION, false)); boolean serverSideEncryption = getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", settings.getAsBytesSize(REPOSITORY_S3.BUFFER_SIZE, null)); ByteSizeValue bufferSize = getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", settings.getAsInt(REPOSITORY_S3.MAX_RETRIES, 3)); Integer maxRetries = getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize(REPOSITORY_S3.CHUNK_SIZE, new ByteSizeValue(100, ByteSizeUnit.MB))); this.chunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean(REPOSITORY_S3.COMPRESS, false)); this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
// Parse and validate the user's S3 Storage Class setting // Parse and validate the user's S3 Storage Class setting
String storageClass = repositorySettings.settings().get("storage_class", settings.get(REPOSITORY_S3.STORAGE_CLASS, null)); String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = repositorySettings.settings().get("canned_acl", settings.get(REPOSITORY_S3.CANNED_ACL, null)); String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]", logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass); bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), String key = getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING);
String secret = getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, key, secret, maxRetries),
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass); bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
String basePath = repositorySettings.settings().get("base_path", settings.get(REPOSITORY_S3.BASE_PATH)); String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) { if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath(); BlobPath path = new BlobPath();
for(String elem : Strings.splitStringToArray(basePath, '/')) { for(String elem : Strings.splitStringToArray(basePath, '/')) {
@ -171,4 +302,13 @@ public class S3Repository extends BlobStoreRepository {
return chunkSize; return chunkSize;
} }
public static <T> T getValue(RepositorySettings repositorySettings,
Setting<T> repositorySetting,
Setting<T> repositoriesSetting) {
if (repositorySetting.exists(repositorySettings.settings())) {
return repositorySetting.get(repositorySettings.settings());
} else {
return repositoriesSetting.get(repositorySettings.globalSettings());
}
}
} }

View File

@ -20,11 +20,24 @@
package org.elasticsearch.cloud.aws; package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
public class AWSSignersTests extends ESTestCase { public class AWSSignersTests extends ESTestCase {
/**
* Starts S3RepositoryPlugin. It's a workaround when you run test from IntelliJ. Otherwise it generates
* java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessDeclaredMembers")
*/
@BeforeClass
public static void instantiatePlugin() {
new S3RepositoryPlugin();
}
public void testSigners() { public void testSigners() {
assertThat(signerTester(null), is(false)); assertThat(signerTester(null), is(false));
assertThat(signerTester("QueryStringSignerType"), is(true)); assertThat(signerTester("QueryStringSignerType"), is(true));

View File

@ -25,9 +25,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin; import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty; import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import java.util.Collection;
/** /**
* Base class for AWS tests that require credentials. * Base class for AWS tests that require credentials.
* <p> * <p>
@ -42,7 +45,6 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
Settings.Builder settings = Settings.builder() Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.extendArray("plugin.types", S3RepositoryPlugin.class.getName(), TestAwsS3Service.TestPlugin.class.getName())
.put("cloud.aws.test.random", randomInt()) .put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1) .put("cloud.aws.test.write_failures", 0.1)
.put("cloud.aws.test.read_failures", 0.1); .put("cloud.aws.test.read_failures", 0.1);
@ -52,11 +54,16 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
if (Strings.hasText(System.getProperty("tests.config"))) { if (Strings.hasText(System.getProperty("tests.config"))) {
settings.loadFromPath(PathUtils.get(System.getProperty("tests.config"))); settings.loadFromPath(PathUtils.get(System.getProperty("tests.config")));
} else { } else {
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml"); throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
} }
} catch (SettingsException exception) { } catch (SettingsException exception) {
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception); throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
} }
return settings.build(); return settings.build();
} }
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(S3RepositoryPlugin.class);
}
} }

View File

@ -0,0 +1,302 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.test.ESTestCase;
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
public class RepositoryS3SettingsTests extends ESTestCase {
private static final Settings AWS = Settings.builder()
.put(AwsS3Service.KEY_SETTING.getKey(), "global-key")
.put(AwsS3Service.SECRET_SETTING.getKey(), "global-secret")
.put(AwsS3Service.PROTOCOL_SETTING.getKey(), "https")
.put(AwsS3Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
.put(AwsS3Service.PROXY_PORT_SETTING.getKey(), 10000)
.put(AwsS3Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
.put(AwsS3Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
.put(AwsS3Service.SIGNER_SETTING.getKey(), "global-signer")
.put(AwsS3Service.REGION_SETTING.getKey(), "global-region")
.build();
private static final Settings S3 = Settings.builder()
.put(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey(), "s3-key")
.put(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey(), "s3-secret")
.put(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING.getKey(), "http")
.put(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.getKey(), "s3-proxy-host")
.put(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.getKey(), 20000)
.put(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.getKey(), "s3-proxy-username")
.put(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey(), "s3-proxy-password")
.put(AwsS3Service.CLOUD_S3.SIGNER_SETTING.getKey(), "s3-signer")
.put(AwsS3Service.CLOUD_S3.REGION_SETTING.getKey(), "s3-region")
.put(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING.getKey(), "s3-endpoint")
.build();
private static final Settings REPOSITORIES = Settings.builder()
.put(Repositories.KEY_SETTING.getKey(), "repositories-key")
.put(Repositories.SECRET_SETTING.getKey(), "repositories-secret")
.put(Repositories.BUCKET_SETTING.getKey(), "repositories-bucket")
.put(Repositories.PROTOCOL_SETTING.getKey(), "https")
.put(Repositories.REGION_SETTING.getKey(), "repositories-region")
.put(Repositories.ENDPOINT_SETTING.getKey(), "repositories-endpoint")
.put(Repositories.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
.put(Repositories.BUFFER_SIZE_SETTING.getKey(), "6mb")
.put(Repositories.MAX_RETRIES_SETTING.getKey(), 4)
.put(Repositories.CHUNK_SIZE_SETTING.getKey(), "110mb")
.put(Repositories.COMPRESS_SETTING.getKey(), true)
.put(Repositories.STORAGE_CLASS_SETTING.getKey(), "repositories-class")
.put(Repositories.CANNED_ACL_SETTING.getKey(), "repositories-acl")
.put(Repositories.BASE_PATH_SETTING.getKey(), "repositories-basepath")
.build();
private static final Settings REPOSITORY = Settings.builder()
.put(Repository.KEY_SETTING.getKey(), "repository-key")
.put(Repository.SECRET_SETTING.getKey(), "repository-secret")
.put(Repository.BUCKET_SETTING.getKey(), "repository-bucket")
.put(Repository.PROTOCOL_SETTING.getKey(), "https")
.put(Repository.REGION_SETTING.getKey(), "repository-region")
.put(Repository.ENDPOINT_SETTING.getKey(), "repository-endpoint")
.put(Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), false)
.put(Repository.BUFFER_SIZE_SETTING.getKey(), "7mb")
.put(Repository.MAX_RETRIES_SETTING.getKey(), 5)
.put(Repository.CHUNK_SIZE_SETTING.getKey(), "120mb")
.put(Repository.COMPRESS_SETTING.getKey(), false)
.put(Repository.STORAGE_CLASS_SETTING.getKey(), "repository-class")
.put(Repository.CANNED_ACL_SETTING.getKey(), "repository-acl")
.put(Repository.BASE_PATH_SETTING.getKey(), "repository-basepath")
.build();
/**
* We test when only cloud.aws settings are set
*/
public void testRepositorySettingsGlobalOnly() {
Settings nodeSettings = buildSettings(AWS);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("global-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("global-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("global-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), isEmptyString());
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by cloud.aws.s3 settings
*/
public void testRepositorySettingsGlobalOverloadedByS3() {
Settings nodeSettings = buildSettings(AWS, S3);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("s3-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("s3-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTP));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("s3-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("s3-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by repositories.s3 settings
*/
public void testRepositorySettingsGlobalOverloadedByRepositories() {
Settings nodeSettings = buildSettings(AWS, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(true));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repositories-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
}
/**
* We test when cloud.aws.s3 settings are overloaded by repositories.s3 settings
*/
public void testRepositorySettingsS3OverloadedByRepositories() {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(true));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repositories-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
}
/**
* We test when cloud.aws settings are overloaded by single repository settings
*/
public void testRepositorySettingsGlobalOverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test when cloud.aws.s3 settings are overloaded by single repository settings
*/
public void testRepositorySettingsS3OverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS, S3);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test when repositories settings are overloaded by single repository settings
*/
public void testRepositorySettingsRepositoriesOverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
private Settings buildSettings(Settings... global) {
Settings.Builder builder = Settings.builder();
for (Settings settings : global) {
builder.put(settings);
}
return builder.build();
}
}

View File

@ -18,11 +18,11 @@
*/ */
package org.elasticsearch.cloud.aws; package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
@ -51,17 +51,7 @@ public class TestAwsS3Service extends InternalAwsS3Service {
@Override @Override
public synchronized AmazonS3 client() { public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
return cachedWrapper(super.client());
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key));
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries)); return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries));
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.s3; package org.elasticsearch.repositories.s3;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
@ -32,14 +33,12 @@ import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -54,43 +53,43 @@ import static org.hamcrest.Matchers.notNullValue;
*/ */
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, transportClientRatio = 0.0) @ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, transportClientRatio = 0.0)
abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase { abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase {
@Override @Override
public Settings indexSettings() { public Settings nodeSettings(int nodeOrdinal) {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same // nodeSettings is called before `wipeBefore()` so we need to define basePath here
// checksum file to be written twice during restore operation globalBasePath = "repo-" + randomInt();
return Settings.builder().put(super.indexSettings()) return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE_SETTING.getKey(), false) .put(S3Repository.Repositories.BASE_PATH_SETTING.getKey(), globalBasePath)
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE_SETTING.getKey(), false)
.put("cloud.enabled", true)
.put("plugin.types", S3RepositoryPlugin.class.getName())
.put("repositories.s3.base_path", basePath)
.build(); .build();
} }
private String basePath; private String basePath;
private String globalBasePath;
@Before @Before
public final void wipeBefore() { public final void wipeBefore() {
wipeRepositories(); wipeRepositories();
basePath = "repo-" + randomInt(); basePath = "repo-" + randomInt();
cleanRepositoryFiles(basePath); cleanRepositoryFiles(basePath);
cleanRepositoryFiles(globalBasePath);
} }
@After @After
public final void wipeAfter() { public final void wipeAfter() {
wipeRepositories(); wipeRepositories();
cleanRepositoryFiles(basePath); cleanRepositoryFiles(basePath);
cleanRepositoryFiles(globalBasePath);
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211") @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testSimpleWorkflow() { public void testSimpleWorkflow() {
Client client = client(); Client client = client();
Settings.Builder settings = Settings.settingsBuilder() Settings.Builder settings = Settings.settingsBuilder()
.put("chunk_size", randomIntBetween(1000, 10000)); .put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000));
// We sometime test getting the base_path from node settings using repositories.s3.base_path // We sometime test getting the base_path from node settings using repositories.s3.base_path
if (usually()) { if (usually()) {
settings.put("base_path", basePath); settings.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath);
} }
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
@ -166,9 +165,9 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put("chunk_size", randomIntBetween(1000, 10000)) .put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000))
.put("server_side_encryption", true) .put(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -197,10 +196,11 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
Settings bucket = settings.getByPrefix("repositories.s3."); Settings bucket = settings.getByPrefix("repositories.s3.");
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client( AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
null, null,
null, S3Repository.Repositories.PROTOCOL_SETTING.get(settings),
bucket.get("region", settings.get("repositories.s3.region")), S3Repository.Repositories.REGION_SETTING.get(settings),
bucket.get("access_key", settings.get("cloud.aws.access_key")), S3Repository.Repositories.KEY_SETTING.get(settings),
bucket.get("secret_key", settings.get("cloud.aws.secret_key"))); S3Repository.Repositories.SECRET_SETTING.get(settings),
null);
String bucketName = bucket.get("bucket"); String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath); logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
@ -260,25 +260,36 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
try { try {
client.admin().cluster().preparePutRepository("test-repo") client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put("bucket", bucketSettings.get("bucket")) .put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
).get(); ).get();
fail("repository verification should have raise an exception!"); fail("repository verification should have raise an exception!");
} catch (RepositoryVerificationException e) { } catch (RepositoryVerificationException e) {
} }
} }
public void testRepositoryWithBasePath() {
Client client = client();
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
public void testRepositoryWithCustomCredentials() { public void testRepositoryWithCustomCredentials() {
Client client = client(); Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket."); Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put("region", bucketSettings.get("region")) .put(S3Repository.Repository.REGION_SETTING.getKey(), bucketSettings.get("region"))
.put("access_key", bucketSettings.get("access_key")) .put(S3Repository.Repository.KEY_SETTING.getKey(), bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key")) .put(S3Repository.Repository.SECRET_SETTING.getKey(), bucketSettings.get("secret_key"))
.put("bucket", bucketSettings.get("bucket")) .put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -292,11 +303,11 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repostoriy with endpoint [{}], bucket[{}] and path [{}]", bucketSettings.get("endpoint"), bucketSettings.get("bucket"), basePath); logger.info("--> creating s3 repostoriy with endpoint [{}], bucket[{}] and path [{}]", bucketSettings.get("endpoint"), bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("bucket", bucketSettings.get("bucket")) .put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
.put("endpoint", bucketSettings.get("endpoint")) .put(S3Repository.Repository.ENDPOINT_SETTING.getKey(), bucketSettings.get("endpoint"))
.put("access_key", bucketSettings.get("access_key")) .put(S3Repository.Repository.KEY_SETTING.getKey(), bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key")) .put(S3Repository.Repository.SECRET_SETTING.getKey(), bucketSettings.get("secret_key"))
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo"); assertRepositoryIsOperational(client, "test-repo");
@ -313,8 +324,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
try { try {
client.admin().cluster().preparePutRepository("test-repo") client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put("bucket", bucketSettings.get("bucket")) .put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
// Below setting intentionally omitted to assert bucket is not available in default region. // Below setting intentionally omitted to assert bucket is not available in default region.
// .put("region", privateBucketSettings.get("region")) // .put("region", privateBucketSettings.get("region"))
).get(); ).get();
@ -331,9 +342,9 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put("bucket", bucketSettings.get("bucket")) .put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
.put("region", bucketSettings.get("region")) .put(S3Repository.Repository.REGION_SETTING.getKey(), bucketSettings.get("region"))
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -348,7 +359,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -369,7 +380,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository without any path"); logger.info("--> creating s3 repository without any path");
PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder() .setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath) .put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -454,17 +465,17 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
settings.getByPrefix("repositories.s3.external-bucket.") settings.getByPrefix("repositories.s3.external-bucket.")
}; };
for (Settings bucket : buckets) { for (Settings bucket : buckets) {
String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint")); String endpoint = bucket.get("endpoint", S3Repository.Repositories.ENDPOINT_SETTING.get(settings));
String protocol = bucket.get("protocol", settings.get("repositories.s3.protocol")); Protocol protocol = S3Repository.Repositories.PROTOCOL_SETTING.get(settings);
String region = bucket.get("region", settings.get("repositories.s3.region")); String region = bucket.get("region", S3Repository.Repositories.REGION_SETTING.get(settings));
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key")); String accessKey = bucket.get("access_key", S3Repository.Repositories.KEY_SETTING.get(settings));
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key")); String secretKey = bucket.get("secret_key", S3Repository.Repositories.SECRET_SETTING.get(settings));
String bucketName = bucket.get("bucket"); String bucketName = bucket.get("bucket");
// We check that settings has been set in elasticsearch.yml integration test file // We check that settings has been set in elasticsearch.yml integration test file
// as described in README // as described in README
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue()); assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey); AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey, null);
try { try {
ObjectListing prevListing = null; ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html //From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html