Migrate AWS settings to new settings infrastructure

Closes #16293.
This commit is contained in:
David Pilato 2016-01-29 18:05:18 +01:00
parent 65391e8a83
commit fb7723c186
20 changed files with 876 additions and 344 deletions

View File

@ -19,42 +19,79 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import com.amazonaws.services.ec2.AmazonEC2;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.component.LifecycleComponent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
public interface AwsEc2Service extends LifecycleComponent<AwsEc2Service> {
final class CLOUD_AWS {
public static final String KEY = "cloud.aws.access_key";
public static final String SECRET = "cloud.aws.secret_key";
public static final String PROTOCOL = "cloud.aws.protocol";
public static final String PROXY_HOST = "cloud.aws.proxy.host";
public static final String PROXY_PORT = "cloud.aws.proxy.port";
public static final String PROXY_USERNAME = "cloud.aws.proxy.username";
public static final String PROXY_PASSWORD = "cloud.aws.proxy.password";
public static final String SIGNER = "cloud.aws.signer";
public static final String REGION = "cloud.aws.region";
public interface AwsEc2Service {
Setting<Boolean> AUTO_ATTRIBUTE_SETTING = Setting.boolSetting("cloud.node.auto_attributes", false, false, Setting.Scope.CLUSTER);
// Global AWS settings (shared between discovery-ec2 and repository-s3)
// Each setting starting with `cloud.aws` also exists in repository-s3 project. Don't forget to update
// the code there if you change anything here.
Setting<String> KEY_SETTING = Setting.simpleString("cloud.aws.access_key", false, Setting.Scope.CLUSTER);
Setting<String> SECRET_SETTING = Setting.simpleString("cloud.aws.secret_key", false, Setting.Scope.CLUSTER);
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)),
false, Setting.Scope.CLUSTER);
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("cloud.aws.proxy.host", false, Setting.Scope.CLUSTER);
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("cloud.aws.proxy.port", 80, 0, 1<<16, false, Setting.Scope.CLUSTER);
Setting<String> PROXY_USERNAME_SETTING = Setting.simpleString("cloud.aws.proxy.username", false, Setting.Scope.CLUSTER);
Setting<String> PROXY_PASSWORD_SETTING = Setting.simpleString("cloud.aws.proxy.password", false, Setting.Scope.CLUSTER);
Setting<String> SIGNER_SETTING = Setting.simpleString("cloud.aws.signer", false, Setting.Scope.CLUSTER);
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
interface CLOUD_EC2 {
Setting<String> KEY_SETTING = new Setting<>("cloud.aws.ec2.access_key", AwsEc2Service.KEY_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
Setting<String> SECRET_SETTING = new Setting<>("cloud.aws.ec2.secret_key", AwsEc2Service.SECRET_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.ec2.protocol", AwsEc2Service.PROTOCOL_SETTING,
s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
Setting<String> PROXY_HOST_SETTING = new Setting<>("cloud.aws.ec2.proxy.host", AwsEc2Service.PROXY_HOST_SETTING,
Function.identity(), false, Setting.Scope.CLUSTER);
Setting<Integer> PROXY_PORT_SETTING = new Setting<>("cloud.aws.ec2.proxy.port", AwsEc2Service.PROXY_PORT_SETTING,
s -> Setting.parseInt(s, 0, 1<<16, "cloud.aws.ec2.proxy.port"), false, Setting.Scope.CLUSTER);
Setting<String> PROXY_USERNAME_SETTING = new Setting<>("cloud.aws.ec2.proxy.username", AwsEc2Service.PROXY_USERNAME_SETTING,
Function.identity(), false, Setting.Scope.CLUSTER);
Setting<String> PROXY_PASSWORD_SETTING = new Setting<>("cloud.aws.ec2.proxy.password", AwsEc2Service.PROXY_PASSWORD_SETTING,
Function.identity(), false, Setting.Scope.CLUSTER);
Setting<String> SIGNER_SETTING = new Setting<>("cloud.aws.ec2.signer", AwsEc2Service.SIGNER_SETTING, Function.identity(),
false, Setting.Scope.CLUSTER);
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.ec2.region", AwsEc2Service.REGION_SETTING,
s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
Setting<String> ENDPOINT_SETTING = Setting.simpleString("cloud.aws.ec2.endpoint", false, Setting.Scope.CLUSTER);
}
final class CLOUD_EC2 {
public static final String KEY = "cloud.aws.ec2.access_key";
public static final String SECRET = "cloud.aws.ec2.secret_key";
public static final String PROTOCOL = "cloud.aws.ec2.protocol";
public static final String PROXY_HOST = "cloud.aws.ec2.proxy.host";
public static final String PROXY_PORT = "cloud.aws.ec2.proxy.port";
public static final String PROXY_USERNAME = "cloud.aws.ec2.proxy.username";
public static final String PROXY_PASSWORD = "cloud.aws.ec2.proxy.password";
public static final String SIGNER = "cloud.aws.ec2.signer";
public static final String ENDPOINT = "cloud.aws.ec2.endpoint";
}
interface DISCOVERY_EC2 {
enum HostType {
PRIVATE_IP,
PUBLIC_IP,
PRIVATE_DNS,
PUBLIC_DNS
}
final class DISCOVERY_EC2 {
public static final String HOST_TYPE = "discovery.ec2.host_type";
public static final String ANY_GROUP = "discovery.ec2.any_group";
public static final String GROUPS = "discovery.ec2.groups";
public static final String TAG_PREFIX = "discovery.ec2.tag.";
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones";
public static final String NODE_CACHE_TIME = "discovery.ec2.node_cache_time";
Setting<HostType> HOST_TYPE_SETTING =
new Setting<>("discovery.ec2.host_type", HostType.PRIVATE_IP.name(), s -> HostType.valueOf(s.toUpperCase(Locale.ROOT)), false,
Setting.Scope.CLUSTER);
Setting<Boolean> ANY_GROUP_SETTING =
Setting.boolSetting("discovery.ec2.any_group", true, false, Setting.Scope.CLUSTER);
Setting<List<String>> GROUPS_SETTING =
Setting.listSetting("discovery.ec2.groups", new ArrayList<>(), s -> s.toString(), false, Setting.Scope.CLUSTER);
Setting<List<String>> AVAILABILITY_ZONES_SETTING =
Setting.listSetting("discovery.ec2.availability_zones", Collections.emptyList(), s -> s.toString(), false,
Setting.Scope.CLUSTER);
Setting<TimeValue> NODE_CACHE_TIME_SETTING =
Setting.timeSetting("discovery.ec2.node_cache_time", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);
Setting<Settings> TAG_SETTING = Setting.groupSetting("discovery.ec2.tag.", false,Setting.Scope.CLUSTER);
}
AmazonEC2 client();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.BasicAWSCredentials;
@ -33,18 +32,17 @@ import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import java.util.Locale;
import java.util.Random;
/**
@ -74,30 +72,15 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
// the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0);
String protocol = settings.get(CLOUD_EC2.PROTOCOL, settings.get(CLOUD_AWS.PROTOCOL, "https")).toLowerCase(Locale.ROOT);
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = settings.get(CLOUD_EC2.KEY, settings.get(CLOUD_AWS.KEY));
String key = settings.get(CLOUD_EC2.SECRET, settings.get(CLOUD_AWS.SECRET));
clientConfiguration.setProtocol(CLOUD_EC2.PROTOCOL_SETTING.get(settings));
String key = CLOUD_EC2.KEY_SETTING.get(settings);
String secret = CLOUD_EC2.SECRET_SETTING.get(settings);
String proxyHost = settings.get(CLOUD_AWS.PROXY_HOST);
proxyHost = settings.get(CLOUD_EC2.PROXY_HOST, proxyHost);
String proxyHost = CLOUD_EC2.PROXY_HOST_SETTING.get(settings);
if (proxyHost != null) {
String portString = settings.get(CLOUD_AWS.PROXY_PORT, "80");
portString = settings.get(CLOUD_EC2.PROXY_PORT, portString);
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
String proxyUsername = settings.get(CLOUD_EC2.PROXY_USERNAME, settings.get(CLOUD_AWS.PROXY_USERNAME));
String proxyPassword = settings.get(CLOUD_EC2.PROXY_PASSWORD, settings.get(CLOUD_AWS.PROXY_PASSWORD));
Integer proxyPort = CLOUD_EC2.PROXY_PORT_SETTING.get(settings);
String proxyUsername = CLOUD_EC2.PROXY_USERNAME_SETTING.get(settings);
String proxyPassword = CLOUD_EC2.PROXY_PASSWORD_SETTING.get(settings);
clientConfiguration
.withProxyHost(proxyHost)
@ -107,15 +90,10 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
}
// #155: we might have 3rd party users using older EC2 API version
String awsSigner = settings.get(CLOUD_EC2.SIGNER, settings.get(CLOUD_AWS.SIGNER));
if (awsSigner != null) {
String awsSigner = CLOUD_EC2.SIGNER_SETTING.get(settings);
if (Strings.hasText(awsSigner)) {
logger.debug("using AWS API signer [{}]", awsSigner);
try {
AwsSigner.configureSigner(awsSigner, clientConfiguration);
} catch (IllegalArgumentException e) {
logger.warn("wrong signer set for [{}] or [{}]: [{}]",
CLOUD_EC2.SIGNER, CLOUD_AWS.SIGNER, awsSigner);
}
AwsSigner.configureSigner(awsSigner, clientConfiguration);
}
// Increase the number of retries in case of 5xx API responses
@ -138,7 +116,7 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
AWSCredentialsProvider credentials;
if (account == null && key == null) {
if (key == null && secret == null) {
credentials = new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
@ -146,19 +124,18 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
);
} else {
credentials = new AWSCredentialsProviderChain(
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
new StaticCredentialsProvider(new BasicAWSCredentials(key, secret))
);
}
this.client = new AmazonEC2Client(credentials, clientConfiguration);
if (settings.get(CLOUD_EC2.ENDPOINT) != null) {
String endpoint = settings.get(CLOUD_EC2.ENDPOINT);
String endpoint = CLOUD_EC2.ENDPOINT_SETTING.get(settings);
if (endpoint != null) {
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
client.setEndpoint(endpoint);
} else if (settings.get(CLOUD_AWS.REGION) != null) {
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT);
String endpoint;
} else if (CLOUD_EC2.REGION_SETTING.exists(settings)) {
String region = CLOUD_EC2.REGION_SETTING.get(settings);
if (region.equals("us-east-1") || region.equals("us-east")) {
endpoint = "ec2.us-east-1.amazonaws.com";
} else if (region.equals("us-west") || region.equals("us-west-1")) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.aws.node;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.component.AbstractComponent;
@ -45,7 +46,7 @@ public class Ec2CustomNodeAttributes extends AbstractComponent implements Discov
@Override
public Map<String, String> buildAttributes() {
if (!settings.getAsBoolean("cloud.node.auto_attributes", false)) {
if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
return null;
}
Map<String, String> ec2Attributes = new HashMap<>();

View File

@ -31,7 +31,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -42,11 +41,9 @@ import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -55,13 +52,6 @@ import java.util.Set;
*/
public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
private static enum HostType {
PRIVATE_IP,
PUBLIC_IP,
PRIVATE_DNS,
PUBLIC_DNS
}
private final TransportService transportService;
private final AmazonEC2 client;
@ -76,7 +66,7 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
private final Set<String> availabilityZones;
private final HostType hostType;
private final DISCOVERY_EC2.HostType hostType;
private final DiscoNodesCache discoNodes;
@ -87,24 +77,17 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
this.client = awsEc2Service.client();
this.version = version;
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip")
.toUpperCase(Locale.ROOT));
this.hostType = DISCOVERY_EC2.HOST_TYPE_SETTING.get(settings);
this.discoNodes = new DiscoNodesCache(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.get(settings));
this.discoNodes = new DiscoNodesCache(this.settings.getAsTime(DISCOVERY_EC2.NODE_CACHE_TIME,
TimeValue.timeValueMillis(10_000L)));
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
this.bindAnyGroup = DISCOVERY_EC2.ANY_GROUP_SETTING.get(settings);
this.groups = new HashSet<>();
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS)));
this.groups.addAll(DISCOVERY_EC2.GROUPS_SETTING.get(settings));
this.tags = settings.getByPrefix(DISCOVERY_EC2.TAG_PREFIX).getAsMap();
this.tags = DISCOVERY_EC2.TAG_SETTING.get(settings).getAsMap();
Set<String> availabilityZones = new HashSet<>();
availabilityZones.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.AVAILABILITY_ZONES)));
if (settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES) != null) {
availabilityZones.addAll(Strings.commaDelimitedListToSet(settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES)));
}
this.availabilityZones = availabilityZones;
this.availabilityZones = new HashSet<>();
availabilityZones.addAll(DISCOVERY_EC2.AVAILABILITY_ZONES_SETTING.get(settings));
if (logger.isDebugEnabled()) {
logger.debug("using host_type [{}], tags [{}], groups [{}] with any_group [{}], availability_zones [{}]", hostType, tags, groups, bindAnyGroup, availabilityZones);

View File

@ -19,11 +19,6 @@
package org.elasticsearch.plugin.discovery.ec2;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
@ -39,6 +34,11 @@ import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.ec2.Ec2Discovery;
import org.elasticsearch.plugins.Plugin;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
/**
*
*/
@ -104,12 +104,43 @@ public class Ec2DiscoveryPlugin extends Plugin {
}
public void onModule(SettingsModule settingsModule) {
// Register global cloud aws settings: cloud.aws
settingsModule.registerSetting(AwsEc2Service.KEY_SETTING);
settingsModule.registerSetting(AwsEc2Service.SECRET_SETTING);
settingsModule.registerSetting(AwsEc2Service.PROTOCOL_SETTING);
settingsModule.registerSetting(AwsEc2Service.PROXY_HOST_SETTING);
settingsModule.registerSetting(AwsEc2Service.PROXY_PORT_SETTING);
settingsModule.registerSetting(AwsEc2Service.PROXY_USERNAME_SETTING);
settingsModule.registerSetting(AwsEc2Service.PROXY_PASSWORD_SETTING);
settingsModule.registerSetting(AwsEc2Service.SIGNER_SETTING);
settingsModule.registerSetting(AwsEc2Service.REGION_SETTING);
// Register EC2 specific settings: cloud.aws.ec2
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.KEY_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.SECRET_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.REGION_SETTING);
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING);
// Register EC2 discovery settings: discovery.ec2
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.HOST_TYPE_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.ANY_GROUP_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.GROUPS_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.AVAILABILITY_ZONES_SETTING);
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.NODE_CACHE_TIME_SETTING);
// Filter global settings
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.KEY);
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.SECRET);
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.PROXY_PASSWORD);
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.KEY);
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.SECRET);
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD);
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.PROXY_PASSWORD_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.getKey());
}
}

View File

@ -20,11 +20,24 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.is;
public class AWSSignersTests extends ESTestCase {
/**
* Starts Ec2DiscoveryPlugin. It's a workaround when you run test from IntelliJ. Otherwise it generates
* java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessDeclaredMembers")
*/
@BeforeClass
public static void instantiatePlugin() {
new Ec2DiscoveryPlugin(Settings.EMPTY);
}
public void testSigners() {
assertThat(signerTester(null), is(false));
assertThat(signerTester("QueryStringSignerType"), is(true));

View File

@ -25,9 +25,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import java.util.Collection;
/**
* Base class for AWS tests that require credentials.
* <p>
@ -42,7 +45,6 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.extendArray("plugin.types", Ec2DiscoveryPlugin.class.getName())
.put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1)
.put("cloud.aws.test.read_failures", 0.1);
@ -52,11 +54,16 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
if (Strings.hasText(System.getProperty("tests.config"))) {
settings.loadFromPath(PathUtils.get(System.getProperty("tests.config")));
} else {
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
}
} catch (SettingsException exception) {
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
}
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(Ec2DiscoveryPlugin.class);
}
}

View File

@ -19,11 +19,14 @@
package org.elasticsearch.discovery.ec2;
import com.amazonaws.Protocol;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.Ec2Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
public class Ec2DiscoverySettingsTests extends ESTestCase {
@ -41,4 +44,71 @@ public class Ec2DiscoverySettingsTests extends ESTestCase {
assertThat(discoveryReady, is(false));
}
private static final Settings AWS = Settings.builder()
.put(AwsEc2Service.KEY_SETTING.getKey(), "global-key")
.put(AwsEc2Service.SECRET_SETTING.getKey(), "global-secret")
.put(AwsEc2Service.PROTOCOL_SETTING.getKey(), "https")
.put(AwsEc2Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
.put(AwsEc2Service.PROXY_PORT_SETTING.getKey(), 10000)
.put(AwsEc2Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
.put(AwsEc2Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
.put(AwsEc2Service.SIGNER_SETTING.getKey(), "global-signer")
.put(AwsEc2Service.REGION_SETTING.getKey(), "global-region")
.build();
private static final Settings EC2 = Settings.builder()
.put(AwsEc2Service.CLOUD_EC2.KEY_SETTING.getKey(), "ec2-key")
.put(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.getKey(), "ec2-secret")
.put(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.getKey(), "http")
.put(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.getKey(), "ec2-proxy-host")
.put(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.getKey(), 20000)
.put(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.getKey(), "ec2-proxy-username")
.put(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.getKey(), "ec2-proxy-password")
.put(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.getKey(), "ec2-signer")
.put(AwsEc2Service.CLOUD_EC2.REGION_SETTING.getKey(), "ec2-region")
.put(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.getKey(), "ec2-endpoint")
.build();
/**
* We test when only cloud.aws settings are set
*/
public void testRepositorySettingsGlobalOnly() {
Settings nodeSettings = buildSettings(AWS);
assertThat(AwsEc2Service.CLOUD_EC2.KEY_SETTING.get(nodeSettings), is("global-key"));
assertThat(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.get(nodeSettings), is("global-secret"));
assertThat(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.get(nodeSettings), is(Protocol.HTTPS));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(AwsEc2Service.CLOUD_EC2.REGION_SETTING.get(nodeSettings), is("global-region"));
assertThat(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.get(nodeSettings), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by cloud.aws.ec2 settings
*/
public void testRepositorySettingsGlobalOverloadedByEC2() {
Settings nodeSettings = buildSettings(AWS, EC2);
assertThat(AwsEc2Service.CLOUD_EC2.KEY_SETTING.get(nodeSettings), is("ec2-key"));
assertThat(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.get(nodeSettings), is("ec2-secret"));
assertThat(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.get(nodeSettings), is(Protocol.HTTP));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.get(nodeSettings), is("ec2-proxy-host"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.get(nodeSettings), is("ec2-proxy-username"));
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.get(nodeSettings), is("ec2-proxy-password"));
assertThat(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.get(nodeSettings), is("ec2-signer"));
assertThat(AwsEc2Service.CLOUD_EC2.REGION_SETTING.get(nodeSettings), is("ec2-region"));
assertThat(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.get(nodeSettings), is("ec2-endpoint"));
}
private Settings buildSettings(Settings... global) {
Settings.Builder builder = Settings.builder();
for (Settings settings : global) {
builder.put(settings);
}
return builder.build();
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
@ -95,7 +94,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPrivateIp() throws InterruptedException {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "private_ip")
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "private_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
@ -111,7 +110,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPublicIp() throws InterruptedException {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "public_ip")
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "public_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
@ -127,7 +126,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPrivateDns() throws InterruptedException {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "private_dns")
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "private_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
@ -145,7 +144,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPublicDns() throws InterruptedException {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "public_dns")
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "public_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
@ -162,7 +161,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testInvalidHostType() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.HOST_TYPE, "does_not_exist")
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "does_not_exist")
.build();
try {
buildDynamicNodes(nodeSettings, 1);
@ -175,7 +174,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testFilterByTags() throws InterruptedException {
int nodes = randomIntBetween(5, 10);
Settings nodeSettings = Settings.builder()
.put(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod")
.put(DISCOVERY_EC2.TAG_SETTING.getKey() + "stage", "prod")
.build();
int prodInstances = 0;
@ -200,7 +199,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testFilterByMultipleTags() throws InterruptedException {
int nodes = randomIntBetween(5, 10);
Settings nodeSettings = Settings.builder()
.putArray(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod", "preprod")
.putArray(DISCOVERY_EC2.TAG_SETTING.getKey() + "stage", "prod", "preprod")
.build();
int prodInstances = 0;
@ -252,7 +251,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testGetNodeListCached() throws Exception {
Settings.Builder builder = Settings.settingsBuilder()
.put(DISCOVERY_EC2.NODE_CACHE_TIME, "500ms");
.put(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.getKey(), "500ms");
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) {
@Override

View File

@ -23,7 +23,6 @@ package org.elasticsearch.discovery.ec2;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -39,8 +38,6 @@ import static org.hamcrest.CoreMatchers.is;
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
public void testMinimumMasterNodesStart() {
Settings nodeSettings = settingsBuilder()
.put("plugin.types", Ec2DiscoveryPlugin.class.getName())
.put("cloud.enabled", true)
.put("discovery.type", "ec2")
.build();
internalCluster().startNode(nodeSettings);

View File

@ -0,0 +1,48 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' at
# the root directory of all plugins.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# <arbitrary name1>.jar <-- classes, resources, dependencies
# <arbitrary nameN>.jar <-- any number of jars
# plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=2.0
# elasticsearch.version=2.0
# java.version=1.7
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=The S3 repository plugin adds S3 repositories.
#
# 'version': plugin's version
version=3.0.0-SNAPSHOT
#
# 'name': the plugin name
name=repository-s3
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin
#
# 'java.version' version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version' version of elasticsearch compiled against
elasticsearch.version=3.0.0-SNAPSHOT
#
### deprecated elements for jvm plugins :
#
# 'isolated': true if the plugin should have its own classloader.
# passing false is deprecated, and only intended to support plugins
# that have hard dependencies against each other. If this is
# not specified, then the plugin is isolated by default.
isolated=true
#

View File

@ -19,59 +19,61 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import java.util.Locale;
import java.util.function.Function;
/**
*
*/
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
final class CLOUD_AWS {
public static final String KEY = "cloud.aws.access_key";
public static final String SECRET = "cloud.aws.secret_key";
public static final String PROTOCOL = "cloud.aws.protocol";
public static final String PROXY_HOST = "cloud.aws.proxy.host";
public static final String PROXY_PORT = "cloud.aws.proxy.port";
public static final String PROXY_USERNAME = "cloud.aws.proxy.username";
public static final String PROXY_PASSWORD = "cloud.aws.proxy.password";
public static final String SIGNER = "cloud.aws.signer";
public static final String REGION = "cloud.aws.region";
// Global AWS settings (shared between discovery-ec2 and repository-s3)
// Each setting starting with `cloud.aws` also exists in discovery-ec2 project. Don't forget to update
// the code there if you change anything here.
Setting<String> KEY_SETTING = Setting.simpleString("cloud.aws.access_key", false, Setting.Scope.CLUSTER);
Setting<String> SECRET_SETTING = Setting.simpleString("cloud.aws.secret_key", false, Setting.Scope.CLUSTER);
Setting<Protocol> PROTOCOL_SETTING =
new Setting<>("cloud.aws.protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("cloud.aws.proxy.host", false, Setting.Scope.CLUSTER);
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("cloud.aws.proxy.port", 80, 0, 1<<16, false, Setting.Scope.CLUSTER);
Setting<String> PROXY_USERNAME_SETTING = Setting.simpleString("cloud.aws.proxy.username", false, Setting.Scope.CLUSTER);
Setting<String> PROXY_PASSWORD_SETTING = Setting.simpleString("cloud.aws.proxy.password", false, Setting.Scope.CLUSTER);
Setting<String> SIGNER_SETTING = Setting.simpleString("cloud.aws.signer", false, Setting.Scope.CLUSTER);
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
// Specific S3 settings
interface CLOUD_S3 {
Setting<String> KEY_SETTING =
new Setting<>("cloud.aws.s3.access_key", AwsS3Service.KEY_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
Setting<String> SECRET_SETTING =
new Setting<>("cloud.aws.s3.secret_key", AwsS3Service.SECRET_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
Setting<Protocol> PROTOCOL_SETTING =
new Setting<>("cloud.aws.s3.protocol", AwsS3Service.PROTOCOL_SETTING, s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false,
Setting.Scope.CLUSTER);
Setting<String> PROXY_HOST_SETTING =
new Setting<>("cloud.aws.s3.proxy.host", AwsS3Service.PROXY_HOST_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
Setting<Integer> PROXY_PORT_SETTING =
new Setting<>("cloud.aws.s3.proxy.port", AwsS3Service.PROXY_PORT_SETTING,
s -> Setting.parseInt(s, 0, 1<<16, "cloud.aws.s3.proxy.port"), false, Setting.Scope.CLUSTER);
Setting<String> PROXY_USERNAME_SETTING =
new Setting<>("cloud.aws.s3.proxy.username", AwsS3Service.PROXY_USERNAME_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
Setting<String> PROXY_PASSWORD_SETTING =
new Setting<>("cloud.aws.s3.proxy.password", AwsS3Service.PROXY_PASSWORD_SETTING, Function.identity(), false,
Setting.Scope.CLUSTER);
Setting<String> SIGNER_SETTING =
new Setting<>("cloud.aws.s3.signer", AwsS3Service.SIGNER_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
Setting<String> REGION_SETTING =
new Setting<>("cloud.aws.s3.region", AwsS3Service.REGION_SETTING, s -> s.toLowerCase(Locale.ROOT), false,
Setting.Scope.CLUSTER);
Setting<String> ENDPOINT_SETTING =
Setting.simpleString("cloud.aws.s3.endpoint", false, Setting.Scope.CLUSTER);
}
final class CLOUD_S3 {
public static final String KEY = "cloud.aws.s3.access_key";
public static final String SECRET = "cloud.aws.s3.secret_key";
public static final String PROTOCOL = "cloud.aws.s3.protocol";
public static final String PROXY_HOST = "cloud.aws.s3.proxy.host";
public static final String PROXY_PORT = "cloud.aws.s3.proxy.port";
public static final String PROXY_USERNAME = "cloud.aws.s3.proxy.username";
public static final String PROXY_PASSWORD = "cloud.aws.s3.proxy.password";
public static final String SIGNER = "cloud.aws.s3.signer";
public static final String ENDPOINT = "cloud.aws.s3.endpoint";
}
final class REPOSITORY_S3 {
public static final String BUCKET = "repositories.s3.bucket";
public static final String ENDPOINT = "repositories.s3.endpoint";
public static final String PROTOCOL = "repositories.s3.protocol";
public static final String REGION = "repositories.s3.region";
public static final String SERVER_SIDE_ENCRYPTION = "repositories.s3.server_side_encryption";
public static final String BUFFER_SIZE = "repositories.s3.buffer_size";
public static final String MAX_RETRIES = "repositories.s3.max_retries";
public static final String CHUNK_SIZE = "repositories.s3.chunk_size";
public static final String COMPRESS = "repositories.s3.compress";
public static final String STORAGE_CLASS = "repositories.s3.storage_class";
public static final String CANNED_ACL = "repositories.s3.canned_acl";
public static final String BASE_PATH = "repositories.s3.base_path";
}
AmazonS3 client();
AmazonS3 client(String endpoint, String protocol, String region, String account, String key);
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries);
}

View File

@ -31,16 +31,14 @@ import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
@ -51,7 +49,7 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
/**
* (acceskey, endpoint) -&gt; client
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<>();
@Inject
public InternalAwsS3Service(Settings settings) {
@ -59,36 +57,23 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
}
@Override
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = settings.get(CLOUD_S3.KEY, settings.get(CLOUD_AWS.KEY));
String key = settings.get(CLOUD_S3.SECRET, settings.get(CLOUD_AWS.SECRET));
return getClient(endpoint, null, account, key, null);
}
@Override
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return client(endpoint, protocol, region, account, key, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
if (region != null && endpoint == null) {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
} else if (endpoint == null) {
endpoint = getDefaultEndpoint();
}
if (account == null || key == null) {
account = settings.get(CLOUD_S3.KEY, settings.get(CLOUD_AWS.KEY));
key = settings.get(CLOUD_S3.SECRET, settings.get(CLOUD_AWS.SECRET));
public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
if (Strings.isNullOrEmpty(endpoint)) {
// We need to set the endpoint based on the region
if (region != null) {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
} else {
// No region has been set so we will use the default endpoint
endpoint = getDefaultEndpoint();
}
}
return getClient(endpoint, protocol, account, key, maxRetries);
}
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
private synchronized AmazonS3 getClient(String endpoint, Protocol protocol, String account, String key, Integer maxRetries) {
Tuple<String, String> clientDescriptor = new Tuple<>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
return client;
@ -98,32 +83,13 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
// the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0);
if (protocol == null) {
protocol = settings.get(CLOUD_AWS.PROTOCOL, "https").toLowerCase(Locale.ROOT);
protocol = settings.get(CLOUD_S3.PROTOCOL, protocol).toLowerCase(Locale.ROOT);
}
clientConfiguration.setProtocol(protocol);
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String proxyHost = settings.get(CLOUD_AWS.PROXY_HOST);
proxyHost = settings.get(CLOUD_S3.PROXY_HOST, proxyHost);
if (proxyHost != null) {
String portString = settings.get(CLOUD_AWS.PROXY_PORT, "80");
portString = settings.get(CLOUD_S3.PROXY_PORT, portString);
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
String proxyUsername = settings.get(CLOUD_S3.PROXY_USERNAME, settings.get(CLOUD_AWS.PROXY_USERNAME));
String proxyPassword = settings.get(CLOUD_S3.PROXY_PASSWORD, settings.get(CLOUD_AWS.PROXY_PASSWORD));
String proxyHost = CLOUD_S3.PROXY_HOST_SETTING.get(settings);
if (Strings.hasText(proxyHost)) {
Integer proxyPort = CLOUD_S3.PROXY_PORT_SETTING.get(settings);
String proxyUsername = CLOUD_S3.PROXY_USERNAME_SETTING.get(settings);
String proxyPassword = CLOUD_S3.PROXY_PASSWORD_SETTING.get(settings);
clientConfiguration
.withProxyHost(proxyHost)
@ -138,8 +104,8 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
}
// #155: we might have 3rd party users using older S3 API version
String awsSigner = settings.get(CLOUD_S3.SIGNER, settings.get(CLOUD_AWS.SIGNER));
if (awsSigner != null) {
String awsSigner = CLOUD_S3.SIGNER_SETTING.get(settings);
if (Strings.hasText(awsSigner)) {
logger.debug("using AWS API signer [{}]", awsSigner);
AwsSigner.configureSigner(awsSigner, clientConfiguration, endpoint);
}
@ -168,11 +134,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
private String getDefaultEndpoint() {
String endpoint = null;
if (settings.get(CLOUD_S3.ENDPOINT) != null) {
endpoint = settings.get(CLOUD_S3.ENDPOINT);
if (CLOUD_S3.ENDPOINT_SETTING.exists(settings)) {
endpoint = CLOUD_S3.ENDPOINT_SETTING.get(settings);
logger.debug("using explicit s3 endpoint [{}]", endpoint);
} else if (settings.get(CLOUD_AWS.REGION) != null) {
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT);
} else if (CLOUD_S3.REGION_SETTING.exists(settings)) {
String region = CLOUD_S3.REGION_SETTING.get(settings);
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}

View File

@ -88,14 +88,70 @@ public class S3RepositoryPlugin extends Plugin {
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class);
}
public void onModule(SettingsModule module) {
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.KEY);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.SECRET);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.PROXY_PASSWORD);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.KEY);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.SECRET);
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.PROXY_PASSWORD);
module.registerSettingsFilter("access_key"); // WTF is this?
module.registerSettingsFilter("secret_key"); // WTF is this?
public void onModule(SettingsModule settingsModule) {
// Register global cloud aws settings: cloud.aws
settingsModule.registerSetting(AwsS3Service.KEY_SETTING);
settingsModule.registerSetting(AwsS3Service.SECRET_SETTING);
settingsModule.registerSetting(AwsS3Service.PROTOCOL_SETTING);
settingsModule.registerSetting(AwsS3Service.PROXY_HOST_SETTING);
settingsModule.registerSetting(AwsS3Service.PROXY_PORT_SETTING);
settingsModule.registerSetting(AwsS3Service.PROXY_USERNAME_SETTING);
settingsModule.registerSetting(AwsS3Service.PROXY_PASSWORD_SETTING);
settingsModule.registerSetting(AwsS3Service.SIGNER_SETTING);
settingsModule.registerSetting(AwsS3Service.REGION_SETTING);
// Register S3 specific settings: cloud.aws.s3
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.KEY_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.SECRET_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.SIGNER_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.REGION_SETTING);
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING);
// Register S3 repositories settings: repositories.s3
settingsModule.registerSetting(S3Repository.Repositories.KEY_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.SECRET_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BUCKET_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.REGION_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.ENDPOINT_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.PROTOCOL_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BUFFER_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.MAX_RETRIES_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.CHUNK_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.COMPRESS_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.STORAGE_CLASS_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.CANNED_ACL_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BASE_PATH_SETTING);
// Register S3 single repository settings
settingsModule.registerSetting(S3Repository.Repository.KEY_SETTING);
settingsModule.registerSetting(S3Repository.Repository.SECRET_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BUCKET_SETTING);
settingsModule.registerSetting(S3Repository.Repository.ENDPOINT_SETTING);
settingsModule.registerSetting(S3Repository.Repository.PROTOCOL_SETTING);
settingsModule.registerSetting(S3Repository.Repository.REGION_SETTING);
settingsModule.registerSetting(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BUFFER_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repository.MAX_RETRIES_SETTING);
settingsModule.registerSetting(S3Repository.Repository.CHUNK_SIZE_SETTING);
settingsModule.registerSetting(S3Repository.Repository.COMPRESS_SETTING);
settingsModule.registerSetting(S3Repository.Repository.STORAGE_CLASS_SETTING);
settingsModule.registerSetting(S3Repository.Repository.CANNED_ACL_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BASE_PATH_SETTING);
// Filter global settings
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.PROXY_PASSWORD_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey());
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey());
settingsModule.registerSettingsFilter(S3Repository.Repository.KEY_SETTING.getKey());
settingsModule.registerSettingsFilter(S3Repository.Repository.SECRET_SETTING.getKey());
}
}

View File

@ -19,14 +19,14 @@
package org.elasticsearch.repositories.s3;
import com.amazonaws.Protocol;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_AWS;
import org.elasticsearch.cloud.aws.AwsS3Service.REPOSITORY_S3;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
@ -37,6 +37,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.util.Locale;
import java.util.function.Function;
/**
* Shared file system implementation of the BlobStoreRepository
@ -55,6 +56,40 @@ public class S3Repository extends BlobStoreRepository {
public final static String TYPE = "s3";
public interface Repositories {
Setting<String> KEY_SETTING = new Setting<>("repositories.s3.access_key", AwsS3Service.CLOUD_S3.KEY_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
Setting<String> SECRET_SETTING = new Setting<>("repositories.s3.secret_key", AwsS3Service.CLOUD_S3.SECRET_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
Setting<String> BUCKET_SETTING = Setting.simpleString("repositories.s3.bucket", false, Setting.Scope.CLUSTER);
Setting<String> REGION_SETTING = new Setting<>("repositories.s3.region", AwsS3Service.CLOUD_S3.REGION_SETTING, s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
Setting<String> ENDPOINT_SETTING = new Setting<>("repositories.s3.endpoint", AwsS3Service.CLOUD_S3.ENDPOINT_SETTING, s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("repositories.s3.protocol", AwsS3Service.CLOUD_S3.PROTOCOL_SETTING, s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("repositories.s3.server_side_encryption", false, false, Setting.Scope.CLUSTER);
Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("repositories.s3.buffer_size", S3BlobStore.MIN_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("repositories.s3.max_retries", 3, false, Setting.Scope.CLUSTER);
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB), false, Setting.Scope.CLUSTER);
Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("repositories.s3.compress", false, false, Setting.Scope.CLUSTER);
Setting<String> STORAGE_CLASS_SETTING = Setting.simpleString("repositories.s3.storage_class", false, Setting.Scope.CLUSTER);
Setting<String> CANNED_ACL_SETTING = Setting.simpleString("repositories.s3.canned_acl", false, Setting.Scope.CLUSTER);
Setting<String> BASE_PATH_SETTING = Setting.simpleString("repositories.s3.base_path", false, Setting.Scope.CLUSTER);
}
public interface Repository {
Setting<String> KEY_SETTING = Setting.simpleString("access_key", false, Setting.Scope.CLUSTER);
Setting<String> SECRET_SETTING = Setting.simpleString("secret_key", false, Setting.Scope.CLUSTER);
Setting<String> BUCKET_SETTING = Setting.simpleString("bucket", false, Setting.Scope.CLUSTER);
Setting<String> ENDPOINT_SETTING = Setting.simpleString("endpoint", false, Setting.Scope.CLUSTER);
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
Setting<String> REGION_SETTING = new Setting<>("region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("server_side_encryption", false, false, Setting.Scope.CLUSTER);
Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", S3BlobStore.MIN_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("max_retries", 3, false, Setting.Scope.CLUSTER);
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("chunk_size", "-1", false, Setting.Scope.CLUSTER);
Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, false, Setting.Scope.CLUSTER);
Setting<String> STORAGE_CLASS_SETTING = Setting.simpleString("storage_class", false, Setting.Scope.CLUSTER);
Setting<String> CANNED_ACL_SETTING = Setting.simpleString("canned_acl", false, Setting.Scope.CLUSTER);
Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path", false, Setting.Scope.CLUSTER);
}
private final S3BlobStore blobStore;
private final BlobPath basePath;
@ -75,62 +110,40 @@ public class S3Repository extends BlobStoreRepository {
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
String bucket = repositorySettings.settings().get("bucket", settings.get(REPOSITORY_S3.BUCKET));
String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
if (bucket == null) {
throw new RepositoryException(name.name(), "No bucket defined for s3 gateway");
}
String endpoint = repositorySettings.settings().get("endpoint", settings.get(REPOSITORY_S3.ENDPOINT));
String protocol = repositorySettings.settings().get("protocol", settings.get(REPOSITORY_S3.PROTOCOL));
String region = repositorySettings.settings().get("region", settings.get(REPOSITORY_S3.REGION));
if (region == null) {
// InternalBucket setting is not set - use global region setting
String regionSetting = settings.get(CLOUD_AWS.REGION);
if (regionSetting != null) {
regionSetting = regionSetting.toLowerCase(Locale.ENGLISH);
if ("us-east".equals(regionSetting) || "us-east-1".equals(regionSetting)) {
// Default bucket - setting region to null
region = null;
} else if ("us-west".equals(regionSetting) || "us-west-1".equals(regionSetting)) {
region = "us-west-1";
} else if ("us-west-2".equals(regionSetting)) {
region = "us-west-2";
} else if ("ap-southeast".equals(regionSetting) || "ap-southeast-1".equals(regionSetting)) {
region = "ap-southeast-1";
} else if ("ap-southeast-2".equals(regionSetting)) {
region = "ap-southeast-2";
} else if ("ap-northeast".equals(regionSetting) || "ap-northeast-1".equals(regionSetting)) {
region = "ap-northeast-1";
} else if ("eu-west".equals(regionSetting) || "eu-west-1".equals(regionSetting)) {
region = "eu-west-1";
} else if ("eu-central".equals(regionSetting) || "eu-central-1".equals(regionSetting)) {
region = "eu-central-1";
} else if ("sa-east".equals(regionSetting) || "sa-east-1".equals(regionSetting)) {
region = "sa-east-1";
} else if ("cn-north".equals(regionSetting) || "cn-north-1".equals(regionSetting)) {
region = "cn-north-1";
}
}
String endpoint = getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING);
Protocol protocol = getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING);
String region = getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING);
// If no region is defined either in region, repositories.s3.region, cloud.aws.s3.region or cloud.aws.region
// we fallback to Default bucket - null
if (Strings.isEmpty(region)) {
region = null;
}
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", settings.getAsBoolean(REPOSITORY_S3.SERVER_SIDE_ENCRYPTION, false));
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", settings.getAsBytesSize(REPOSITORY_S3.BUFFER_SIZE, null));
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", settings.getAsInt(REPOSITORY_S3.MAX_RETRIES, 3));
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize(REPOSITORY_S3.CHUNK_SIZE, new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean(REPOSITORY_S3.COMPRESS, false));
boolean serverSideEncryption = getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
ByteSizeValue bufferSize = getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
Integer maxRetries = getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
this.chunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
// Parse and validate the user's S3 Storage Class setting
String storageClass = repositorySettings.settings().get("storage_class", settings.get(REPOSITORY_S3.STORAGE_CLASS, null));
String cannedACL = repositorySettings.settings().get("canned_acl", settings.get(REPOSITORY_S3.CANNED_ACL, null));
String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries),
String key = getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING);
String secret = getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, key, secret, maxRetries),
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
String basePath = repositorySettings.settings().get("base_path", settings.get(REPOSITORY_S3.BASE_PATH));
String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();
for(String elem : Strings.splitStringToArray(basePath, '/')) {
@ -171,4 +184,13 @@ public class S3Repository extends BlobStoreRepository {
return chunkSize;
}
public static <T> T getValue(RepositorySettings repositorySettings,
Setting<T> repositorySetting,
Setting<T> repositoriesSetting) {
if (repositorySetting.exists(repositorySettings.settings())) {
return repositorySetting.get(repositorySettings.settings());
} else {
return repositoriesSetting.get(repositorySettings.globalSettings());
}
}
}

View File

@ -20,11 +20,24 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.is;
public class AWSSignersTests extends ESTestCase {
/**
* Starts S3RepositoryPlugin. It's a workaround when you run test from IntelliJ. Otherwise it generates
* java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessDeclaredMembers")
*/
@BeforeClass
public static void instantiatePlugin() {
new S3RepositoryPlugin();
}
public void testSigners() {
assertThat(signerTester(null), is(false));
assertThat(signerTester("QueryStringSignerType"), is(true));

View File

@ -25,9 +25,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import java.util.Collection;
/**
* Base class for AWS tests that require credentials.
* <p>
@ -39,10 +42,9 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.extendArray("plugin.types", S3RepositoryPlugin.class.getName(), TestAwsS3Service.TestPlugin.class.getName())
.put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1)
.put("cloud.aws.test.read_failures", 0.1);
@ -52,11 +54,16 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
if (Strings.hasText(System.getProperty("tests.config"))) {
settings.loadFromPath(PathUtils.get(System.getProperty("tests.config")));
} else {
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
}
} catch (SettingsException exception) {
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
}
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(S3RepositoryPlugin.class);
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.test.ESTestCase;
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
public class RepositoryS3SettingsTests extends ESTestCase {
private static final Settings AWS = Settings.builder()
.put(AwsS3Service.KEY_SETTING.getKey(), "global-key")
.put(AwsS3Service.SECRET_SETTING.getKey(), "global-secret")
.put(AwsS3Service.PROTOCOL_SETTING.getKey(), "https")
.put(AwsS3Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
.put(AwsS3Service.PROXY_PORT_SETTING.getKey(), 10000)
.put(AwsS3Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
.put(AwsS3Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
.put(AwsS3Service.SIGNER_SETTING.getKey(), "global-signer")
.put(AwsS3Service.REGION_SETTING.getKey(), "global-region")
.build();
private static final Settings S3 = Settings.builder()
.put(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey(), "s3-key")
.put(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey(), "s3-secret")
.put(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING.getKey(), "http")
.put(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.getKey(), "s3-proxy-host")
.put(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.getKey(), 20000)
.put(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.getKey(), "s3-proxy-username")
.put(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey(), "s3-proxy-password")
.put(AwsS3Service.CLOUD_S3.SIGNER_SETTING.getKey(), "s3-signer")
.put(AwsS3Service.CLOUD_S3.REGION_SETTING.getKey(), "s3-region")
.put(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING.getKey(), "s3-endpoint")
.build();
private static final Settings REPOSITORIES = Settings.builder()
.put(Repositories.KEY_SETTING.getKey(), "repositories-key")
.put(Repositories.SECRET_SETTING.getKey(), "repositories-secret")
.put(Repositories.BUCKET_SETTING.getKey(), "repositories-bucket")
.put(Repositories.PROTOCOL_SETTING.getKey(), "https")
.put(Repositories.REGION_SETTING.getKey(), "repositories-region")
.put(Repositories.ENDPOINT_SETTING.getKey(), "repositories-endpoint")
.put(Repositories.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
.put(Repositories.BUFFER_SIZE_SETTING.getKey(), "6mb")
.put(Repositories.MAX_RETRIES_SETTING.getKey(), 4)
.put(Repositories.CHUNK_SIZE_SETTING.getKey(), "110mb")
.put(Repositories.COMPRESS_SETTING.getKey(), true)
.put(Repositories.STORAGE_CLASS_SETTING.getKey(), "repositories-class")
.put(Repositories.CANNED_ACL_SETTING.getKey(), "repositories-acl")
.put(Repositories.BASE_PATH_SETTING.getKey(), "repositories-basepath")
.build();
private static final Settings REPOSITORY = Settings.builder()
.put(Repository.KEY_SETTING.getKey(), "repository-key")
.put(Repository.SECRET_SETTING.getKey(), "repository-secret")
.put(Repository.BUCKET_SETTING.getKey(), "repository-bucket")
.put(Repository.PROTOCOL_SETTING.getKey(), "https")
.put(Repository.REGION_SETTING.getKey(), "repository-region")
.put(Repository.ENDPOINT_SETTING.getKey(), "repository-endpoint")
.put(Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), false)
.put(Repository.BUFFER_SIZE_SETTING.getKey(), "7mb")
.put(Repository.MAX_RETRIES_SETTING.getKey(), 5)
.put(Repository.CHUNK_SIZE_SETTING.getKey(), "120mb")
.put(Repository.COMPRESS_SETTING.getKey(), false)
.put(Repository.STORAGE_CLASS_SETTING.getKey(), "repository-class")
.put(Repository.CANNED_ACL_SETTING.getKey(), "repository-acl")
.put(Repository.BASE_PATH_SETTING.getKey(), "repository-basepath")
.build();
/**
* We test when only cloud.aws settings are set
*/
public void testRepositorySettingsGlobalOnly() {
Settings nodeSettings = buildSettings(AWS);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("global-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("global-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("global-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), isEmptyString());
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by cloud.aws.s3 settings
*/
public void testRepositorySettingsGlobalOverloadedByS3() {
Settings nodeSettings = buildSettings(AWS, S3);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("s3-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("s3-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTP));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("s3-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("s3-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by repositories.s3 settings
*/
public void testRepositorySettingsGlobalOverloadedByRepositories() {
Settings nodeSettings = buildSettings(AWS, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(true));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repositories-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
}
/**
* We test when cloud.aws.s3 settings are overloaded by repositories.s3 settings
*/
public void testRepositorySettingsS3OverloadedByRepositories() {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(true));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repositories-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
}
/**
* We test when cloud.aws settings are overloaded by single repository settings
*/
public void testRepositorySettingsGlobalOverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test when cloud.aws.s3 settings are overloaded by single repository settings
*/
public void testRepositorySettingsS3OverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS, S3);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test when repositories settings are overloaded by single repository settings
*/
public void testRepositorySettingsRepositoriesOverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
private Settings buildSettings(Settings... global) {
Settings.Builder builder = Settings.builder();
for (Settings settings : global) {
builder.put(settings);
}
return builder.build();
}
}

View File

@ -18,11 +18,11 @@
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.Plugin;
import java.util.IdentityHashMap;
@ -51,17 +51,7 @@ public class TestAwsS3Service extends InternalAwsS3Service {
@Override
public synchronized AmazonS3 client() {
return cachedWrapper(super.client());
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key));
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries));
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.s3;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
@ -32,14 +33,12 @@ import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.After;
import org.junit.Before;
@ -54,43 +53,43 @@ import static org.hamcrest.Matchers.notNullValue;
*/
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, transportClientRatio = 0.0)
abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase {
@Override
public Settings indexSettings() {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
// checksum file to be written twice during restore operation
return Settings.builder().put(super.indexSettings())
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE_SETTING.getKey(), false)
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE_SETTING.getKey(), false)
.put("cloud.enabled", true)
.put("plugin.types", S3RepositoryPlugin.class.getName())
.put("repositories.s3.base_path", basePath)
public Settings nodeSettings(int nodeOrdinal) {
// nodeSettings is called before `wipeBefore()` so we need to define basePath here
globalBasePath = "repo-" + randomInt();
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(S3Repository.Repositories.BASE_PATH_SETTING.getKey(), globalBasePath)
.build();
}
private String basePath;
private String globalBasePath;
@Before
public final void wipeBefore() {
wipeRepositories();
basePath = "repo-" + randomInt();
cleanRepositoryFiles(basePath);
cleanRepositoryFiles(globalBasePath);
}
@After
public final void wipeAfter() {
wipeRepositories();
cleanRepositoryFiles(basePath);
cleanRepositoryFiles(globalBasePath);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testSimpleWorkflow() {
Client client = client();
Settings.Builder settings = Settings.settingsBuilder()
.put("chunk_size", randomIntBetween(1000, 10000));
.put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000));
// We sometime test getting the base_path from node settings using repositories.s3.base_path
if (usually()) {
settings.put("base_path", basePath);
settings.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath);
}
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
@ -166,9 +165,9 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("chunk_size", randomIntBetween(1000, 10000))
.put("server_side_encryption", true)
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000))
.put(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -196,11 +195,12 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
null,
null,
bucket.get("region", settings.get("repositories.s3.region")),
bucket.get("access_key", settings.get("cloud.aws.access_key")),
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
null,
S3Repository.Repositories.PROTOCOL_SETTING.get(settings),
S3Repository.Repositories.REGION_SETTING.get(settings),
S3Repository.Repositories.KEY_SETTING.get(settings),
S3Repository.Repositories.SECRET_SETTING.get(settings),
null);
String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
@ -260,26 +260,37 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
try {
client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
).get();
fail("repository verification should have raise an exception!");
} catch (RepositoryVerificationException e) {
}
}
public void testRepositoryWithBasePath() {
Client client = client();
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
public void testRepositoryWithCustomCredentials() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("region", bucketSettings.get("region"))
.put("access_key", bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key"))
.put("bucket", bucketSettings.get("bucket"))
).get();
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.REGION_SETTING.getKey(), bucketSettings.get("region"))
.put(S3Repository.Repository.KEY_SETTING.getKey(), bucketSettings.get("access_key"))
.put(S3Repository.Repository.SECRET_SETTING.getKey(), bucketSettings.get("secret_key"))
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
@ -292,12 +303,12 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repostoriy with endpoint [{}], bucket[{}] and path [{}]", bucketSettings.get("endpoint"), bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("bucket", bucketSettings.get("bucket"))
.put("endpoint", bucketSettings.get("endpoint"))
.put("access_key", bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key"))
.put("base_path", basePath)
).get();
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
.put(S3Repository.Repository.ENDPOINT_SETTING.getKey(), bucketSettings.get("endpoint"))
.put(S3Repository.Repository.KEY_SETTING.getKey(), bucketSettings.get("access_key"))
.put(S3Repository.Repository.SECRET_SETTING.getKey(), bucketSettings.get("secret_key"))
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
@ -313,8 +324,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
try {
client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
// Below setting intentionally omitted to assert bucket is not available in default region.
// .put("region", privateBucketSettings.get("region"))
).get();
@ -331,10 +342,10 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
.put("region", bucketSettings.get("region"))
).get();
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
.put(S3Repository.Repository.REGION_SETTING.getKey(), bucketSettings.get("region"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
@ -348,7 +359,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -369,8 +380,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository without any path");
PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
).get();
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
try {
@ -454,17 +465,17 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
settings.getByPrefix("repositories.s3.external-bucket.")
};
for (Settings bucket : buckets) {
String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint"));
String protocol = bucket.get("protocol", settings.get("repositories.s3.protocol"));
String region = bucket.get("region", settings.get("repositories.s3.region"));
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key"));
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key"));
String endpoint = bucket.get("endpoint", S3Repository.Repositories.ENDPOINT_SETTING.get(settings));
Protocol protocol = S3Repository.Repositories.PROTOCOL_SETTING.get(settings);
String region = bucket.get("region", S3Repository.Repositories.REGION_SETTING.get(settings));
String accessKey = bucket.get("access_key", S3Repository.Repositories.KEY_SETTING.get(settings));
String secretKey = bucket.get("secret_key", S3Repository.Repositories.SECRET_SETTING.get(settings));
String bucketName = bucket.get("bucket");
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey);
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey, null);
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html