parent
65391e8a83
commit
fb7723c186
|
@ -19,42 +19,79 @@
|
|||
|
||||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.services.ec2.AmazonEC2;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Function;
|
||||
|
||||
public interface AwsEc2Service extends LifecycleComponent<AwsEc2Service> {
|
||||
final class CLOUD_AWS {
|
||||
public static final String KEY = "cloud.aws.access_key";
|
||||
public static final String SECRET = "cloud.aws.secret_key";
|
||||
public static final String PROTOCOL = "cloud.aws.protocol";
|
||||
public static final String PROXY_HOST = "cloud.aws.proxy.host";
|
||||
public static final String PROXY_PORT = "cloud.aws.proxy.port";
|
||||
public static final String PROXY_USERNAME = "cloud.aws.proxy.username";
|
||||
public static final String PROXY_PASSWORD = "cloud.aws.proxy.password";
|
||||
public static final String SIGNER = "cloud.aws.signer";
|
||||
public static final String REGION = "cloud.aws.region";
|
||||
public interface AwsEc2Service {
|
||||
Setting<Boolean> AUTO_ATTRIBUTE_SETTING = Setting.boolSetting("cloud.node.auto_attributes", false, false, Setting.Scope.CLUSTER);
|
||||
|
||||
// Global AWS settings (shared between discovery-ec2 and repository-s3)
|
||||
// Each setting starting with `cloud.aws` also exists in repository-s3 project. Don't forget to update
|
||||
// the code there if you change anything here.
|
||||
Setting<String> KEY_SETTING = Setting.simpleString("cloud.aws.access_key", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SECRET_SETTING = Setting.simpleString("cloud.aws.secret_key", false, Setting.Scope.CLUSTER);
|
||||
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)),
|
||||
false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("cloud.aws.proxy.host", false, Setting.Scope.CLUSTER);
|
||||
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("cloud.aws.proxy.port", 80, 0, 1<<16, false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_USERNAME_SETTING = Setting.simpleString("cloud.aws.proxy.username", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_PASSWORD_SETTING = Setting.simpleString("cloud.aws.proxy.password", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SIGNER_SETTING = Setting.simpleString("cloud.aws.signer", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
|
||||
|
||||
interface CLOUD_EC2 {
|
||||
Setting<String> KEY_SETTING = new Setting<>("cloud.aws.ec2.access_key", AwsEc2Service.KEY_SETTING, Function.identity(), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<String> SECRET_SETTING = new Setting<>("cloud.aws.ec2.secret_key", AwsEc2Service.SECRET_SETTING, Function.identity(), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("cloud.aws.ec2.protocol", AwsEc2Service.PROTOCOL_SETTING,
|
||||
s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_HOST_SETTING = new Setting<>("cloud.aws.ec2.proxy.host", AwsEc2Service.PROXY_HOST_SETTING,
|
||||
Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<Integer> PROXY_PORT_SETTING = new Setting<>("cloud.aws.ec2.proxy.port", AwsEc2Service.PROXY_PORT_SETTING,
|
||||
s -> Setting.parseInt(s, 0, 1<<16, "cloud.aws.ec2.proxy.port"), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_USERNAME_SETTING = new Setting<>("cloud.aws.ec2.proxy.username", AwsEc2Service.PROXY_USERNAME_SETTING,
|
||||
Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_PASSWORD_SETTING = new Setting<>("cloud.aws.ec2.proxy.password", AwsEc2Service.PROXY_PASSWORD_SETTING,
|
||||
Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SIGNER_SETTING = new Setting<>("cloud.aws.ec2.signer", AwsEc2Service.SIGNER_SETTING, Function.identity(),
|
||||
false, Setting.Scope.CLUSTER);
|
||||
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.ec2.region", AwsEc2Service.REGION_SETTING,
|
||||
s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> ENDPOINT_SETTING = Setting.simpleString("cloud.aws.ec2.endpoint", false, Setting.Scope.CLUSTER);
|
||||
}
|
||||
|
||||
final class CLOUD_EC2 {
|
||||
public static final String KEY = "cloud.aws.ec2.access_key";
|
||||
public static final String SECRET = "cloud.aws.ec2.secret_key";
|
||||
public static final String PROTOCOL = "cloud.aws.ec2.protocol";
|
||||
public static final String PROXY_HOST = "cloud.aws.ec2.proxy.host";
|
||||
public static final String PROXY_PORT = "cloud.aws.ec2.proxy.port";
|
||||
public static final String PROXY_USERNAME = "cloud.aws.ec2.proxy.username";
|
||||
public static final String PROXY_PASSWORD = "cloud.aws.ec2.proxy.password";
|
||||
public static final String SIGNER = "cloud.aws.ec2.signer";
|
||||
public static final String ENDPOINT = "cloud.aws.ec2.endpoint";
|
||||
}
|
||||
interface DISCOVERY_EC2 {
|
||||
enum HostType {
|
||||
PRIVATE_IP,
|
||||
PUBLIC_IP,
|
||||
PRIVATE_DNS,
|
||||
PUBLIC_DNS
|
||||
}
|
||||
|
||||
final class DISCOVERY_EC2 {
|
||||
public static final String HOST_TYPE = "discovery.ec2.host_type";
|
||||
public static final String ANY_GROUP = "discovery.ec2.any_group";
|
||||
public static final String GROUPS = "discovery.ec2.groups";
|
||||
public static final String TAG_PREFIX = "discovery.ec2.tag.";
|
||||
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones";
|
||||
public static final String NODE_CACHE_TIME = "discovery.ec2.node_cache_time";
|
||||
Setting<HostType> HOST_TYPE_SETTING =
|
||||
new Setting<>("discovery.ec2.host_type", HostType.PRIVATE_IP.name(), s -> HostType.valueOf(s.toUpperCase(Locale.ROOT)), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<Boolean> ANY_GROUP_SETTING =
|
||||
Setting.boolSetting("discovery.ec2.any_group", true, false, Setting.Scope.CLUSTER);
|
||||
Setting<List<String>> GROUPS_SETTING =
|
||||
Setting.listSetting("discovery.ec2.groups", new ArrayList<>(), s -> s.toString(), false, Setting.Scope.CLUSTER);
|
||||
Setting<List<String>> AVAILABILITY_ZONES_SETTING =
|
||||
Setting.listSetting("discovery.ec2.availability_zones", Collections.emptyList(), s -> s.toString(), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<TimeValue> NODE_CACHE_TIME_SETTING =
|
||||
Setting.timeSetting("discovery.ec2.node_cache_time", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);
|
||||
|
||||
Setting<Settings> TAG_SETTING = Setting.groupSetting("discovery.ec2.tag.", false,Setting.Scope.CLUSTER);
|
||||
}
|
||||
|
||||
AmazonEC2 client();
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.cloud.aws;
|
|||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonWebServiceRequest;
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
|
@ -33,18 +32,17 @@ import com.amazonaws.internal.StaticCredentialsProvider;
|
|||
import com.amazonaws.retry.RetryPolicy;
|
||||
import com.amazonaws.services.ec2.AmazonEC2;
|
||||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
|
||||
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
|
@ -74,30 +72,15 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
|
|||
// the response metadata cache is only there for diagnostics purposes,
|
||||
// but can force objects from every response to the old generation.
|
||||
clientConfiguration.setResponseMetadataCacheSize(0);
|
||||
String protocol = settings.get(CLOUD_EC2.PROTOCOL, settings.get(CLOUD_AWS.PROTOCOL, "https")).toLowerCase(Locale.ROOT);
|
||||
if ("http".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTP);
|
||||
} else if ("https".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTPS);
|
||||
} else {
|
||||
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
|
||||
}
|
||||
String account = settings.get(CLOUD_EC2.KEY, settings.get(CLOUD_AWS.KEY));
|
||||
String key = settings.get(CLOUD_EC2.SECRET, settings.get(CLOUD_AWS.SECRET));
|
||||
clientConfiguration.setProtocol(CLOUD_EC2.PROTOCOL_SETTING.get(settings));
|
||||
String key = CLOUD_EC2.KEY_SETTING.get(settings);
|
||||
String secret = CLOUD_EC2.SECRET_SETTING.get(settings);
|
||||
|
||||
String proxyHost = settings.get(CLOUD_AWS.PROXY_HOST);
|
||||
proxyHost = settings.get(CLOUD_EC2.PROXY_HOST, proxyHost);
|
||||
String proxyHost = CLOUD_EC2.PROXY_HOST_SETTING.get(settings);
|
||||
if (proxyHost != null) {
|
||||
String portString = settings.get(CLOUD_AWS.PROXY_PORT, "80");
|
||||
portString = settings.get(CLOUD_EC2.PROXY_PORT, portString);
|
||||
Integer proxyPort;
|
||||
try {
|
||||
proxyPort = Integer.parseInt(portString, 10);
|
||||
} catch (NumberFormatException ex) {
|
||||
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
|
||||
}
|
||||
String proxyUsername = settings.get(CLOUD_EC2.PROXY_USERNAME, settings.get(CLOUD_AWS.PROXY_USERNAME));
|
||||
String proxyPassword = settings.get(CLOUD_EC2.PROXY_PASSWORD, settings.get(CLOUD_AWS.PROXY_PASSWORD));
|
||||
Integer proxyPort = CLOUD_EC2.PROXY_PORT_SETTING.get(settings);
|
||||
String proxyUsername = CLOUD_EC2.PROXY_USERNAME_SETTING.get(settings);
|
||||
String proxyPassword = CLOUD_EC2.PROXY_PASSWORD_SETTING.get(settings);
|
||||
|
||||
clientConfiguration
|
||||
.withProxyHost(proxyHost)
|
||||
|
@ -107,15 +90,10 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
|
|||
}
|
||||
|
||||
// #155: we might have 3rd party users using older EC2 API version
|
||||
String awsSigner = settings.get(CLOUD_EC2.SIGNER, settings.get(CLOUD_AWS.SIGNER));
|
||||
if (awsSigner != null) {
|
||||
String awsSigner = CLOUD_EC2.SIGNER_SETTING.get(settings);
|
||||
if (Strings.hasText(awsSigner)) {
|
||||
logger.debug("using AWS API signer [{}]", awsSigner);
|
||||
try {
|
||||
AwsSigner.configureSigner(awsSigner, clientConfiguration);
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.warn("wrong signer set for [{}] or [{}]: [{}]",
|
||||
CLOUD_EC2.SIGNER, CLOUD_AWS.SIGNER, awsSigner);
|
||||
}
|
||||
AwsSigner.configureSigner(awsSigner, clientConfiguration);
|
||||
}
|
||||
|
||||
// Increase the number of retries in case of 5xx API responses
|
||||
|
@ -138,7 +116,7 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
|
|||
|
||||
AWSCredentialsProvider credentials;
|
||||
|
||||
if (account == null && key == null) {
|
||||
if (key == null && secret == null) {
|
||||
credentials = new AWSCredentialsProviderChain(
|
||||
new EnvironmentVariableCredentialsProvider(),
|
||||
new SystemPropertiesCredentialsProvider(),
|
||||
|
@ -146,19 +124,18 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
|
|||
);
|
||||
} else {
|
||||
credentials = new AWSCredentialsProviderChain(
|
||||
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
|
||||
new StaticCredentialsProvider(new BasicAWSCredentials(key, secret))
|
||||
);
|
||||
}
|
||||
|
||||
this.client = new AmazonEC2Client(credentials, clientConfiguration);
|
||||
|
||||
if (settings.get(CLOUD_EC2.ENDPOINT) != null) {
|
||||
String endpoint = settings.get(CLOUD_EC2.ENDPOINT);
|
||||
String endpoint = CLOUD_EC2.ENDPOINT_SETTING.get(settings);
|
||||
if (endpoint != null) {
|
||||
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
|
||||
client.setEndpoint(endpoint);
|
||||
} else if (settings.get(CLOUD_AWS.REGION) != null) {
|
||||
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT);
|
||||
String endpoint;
|
||||
} else if (CLOUD_EC2.REGION_SETTING.exists(settings)) {
|
||||
String region = CLOUD_EC2.REGION_SETTING.get(settings);
|
||||
if (region.equals("us-east-1") || region.equals("us-east")) {
|
||||
endpoint = "ec2.us-east-1.amazonaws.com";
|
||||
} else if (region.equals("us-west") || region.equals("us-west-1")) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.cloud.aws.node;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -45,7 +46,7 @@ public class Ec2CustomNodeAttributes extends AbstractComponent implements Discov
|
|||
|
||||
@Override
|
||||
public Map<String, String> buildAttributes() {
|
||||
if (!settings.getAsBoolean("cloud.node.auto_attributes", false)) {
|
||||
if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
|
||||
return null;
|
||||
}
|
||||
Map<String, String> ec2Attributes = new HashMap<>();
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -42,11 +41,9 @@ import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -55,13 +52,6 @@ import java.util.Set;
|
|||
*/
|
||||
public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
|
||||
|
||||
private static enum HostType {
|
||||
PRIVATE_IP,
|
||||
PUBLIC_IP,
|
||||
PRIVATE_DNS,
|
||||
PUBLIC_DNS
|
||||
}
|
||||
|
||||
private final TransportService transportService;
|
||||
|
||||
private final AmazonEC2 client;
|
||||
|
@ -76,7 +66,7 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
|||
|
||||
private final Set<String> availabilityZones;
|
||||
|
||||
private final HostType hostType;
|
||||
private final DISCOVERY_EC2.HostType hostType;
|
||||
|
||||
private final DiscoNodesCache discoNodes;
|
||||
|
||||
|
@ -87,24 +77,17 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
|||
this.client = awsEc2Service.client();
|
||||
this.version = version;
|
||||
|
||||
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip")
|
||||
.toUpperCase(Locale.ROOT));
|
||||
this.hostType = DISCOVERY_EC2.HOST_TYPE_SETTING.get(settings);
|
||||
this.discoNodes = new DiscoNodesCache(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.get(settings));
|
||||
|
||||
this.discoNodes = new DiscoNodesCache(this.settings.getAsTime(DISCOVERY_EC2.NODE_CACHE_TIME,
|
||||
TimeValue.timeValueMillis(10_000L)));
|
||||
|
||||
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
|
||||
this.bindAnyGroup = DISCOVERY_EC2.ANY_GROUP_SETTING.get(settings);
|
||||
this.groups = new HashSet<>();
|
||||
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS)));
|
||||
this.groups.addAll(DISCOVERY_EC2.GROUPS_SETTING.get(settings));
|
||||
|
||||
this.tags = settings.getByPrefix(DISCOVERY_EC2.TAG_PREFIX).getAsMap();
|
||||
this.tags = DISCOVERY_EC2.TAG_SETTING.get(settings).getAsMap();
|
||||
|
||||
Set<String> availabilityZones = new HashSet<>();
|
||||
availabilityZones.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.AVAILABILITY_ZONES)));
|
||||
if (settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES) != null) {
|
||||
availabilityZones.addAll(Strings.commaDelimitedListToSet(settings.get(DISCOVERY_EC2.AVAILABILITY_ZONES)));
|
||||
}
|
||||
this.availabilityZones = availabilityZones;
|
||||
this.availabilityZones = new HashSet<>();
|
||||
availabilityZones.addAll(DISCOVERY_EC2.AVAILABILITY_ZONES_SETTING.get(settings));
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("using host_type [{}], tags [{}], groups [{}] with any_group [{}], availability_zones [{}]", hostType, tags, groups, bindAnyGroup, availabilityZones);
|
||||
|
|
|
@ -19,11 +19,6 @@
|
|||
|
||||
package org.elasticsearch.plugin.discovery.ec2;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
|
||||
|
@ -39,6 +34,11 @@ import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
|
|||
import org.elasticsearch.discovery.ec2.Ec2Discovery;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -104,12 +104,43 @@ public class Ec2DiscoveryPlugin extends Plugin {
|
|||
}
|
||||
|
||||
public void onModule(SettingsModule settingsModule) {
|
||||
// Register global cloud aws settings: cloud.aws
|
||||
settingsModule.registerSetting(AwsEc2Service.KEY_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.SECRET_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.PROTOCOL_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.PROXY_HOST_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.PROXY_PORT_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.PROXY_USERNAME_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.PROXY_PASSWORD_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.SIGNER_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.REGION_SETTING);
|
||||
|
||||
// Register EC2 specific settings: cloud.aws.ec2
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.KEY_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.SECRET_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.REGION_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING);
|
||||
|
||||
// Register EC2 discovery settings: discovery.ec2
|
||||
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.HOST_TYPE_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.ANY_GROUP_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.GROUPS_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.AVAILABILITY_ZONES_SETTING);
|
||||
settingsModule.registerSetting(AwsEc2Service.DISCOVERY_EC2.NODE_CACHE_TIME_SETTING);
|
||||
|
||||
// Filter global settings
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.KEY);
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.SECRET);
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_AWS.PROXY_PASSWORD);
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.KEY);
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.SECRET);
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD);
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.KEY_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.SECRET_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.PROXY_PASSWORD_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.KEY_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.getKey());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,11 +20,24 @@
|
|||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
||||
public class AWSSignersTests extends ESTestCase {
|
||||
|
||||
/**
|
||||
* Starts Ec2DiscoveryPlugin. It's a workaround when you run test from IntelliJ. Otherwise it generates
|
||||
* java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessDeclaredMembers")
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void instantiatePlugin() {
|
||||
new Ec2DiscoveryPlugin(Settings.EMPTY);
|
||||
}
|
||||
|
||||
public void testSigners() {
|
||||
assertThat(signerTester(null), is(false));
|
||||
assertThat(signerTester("QueryStringSignerType"), is(true));
|
||||
|
|
|
@ -25,9 +25,12 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Base class for AWS tests that require credentials.
|
||||
* <p>
|
||||
|
@ -42,7 +45,6 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
|
|||
Settings.Builder settings = Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
|
||||
.extendArray("plugin.types", Ec2DiscoveryPlugin.class.getName())
|
||||
.put("cloud.aws.test.random", randomInt())
|
||||
.put("cloud.aws.test.write_failures", 0.1)
|
||||
.put("cloud.aws.test.read_failures", 0.1);
|
||||
|
@ -52,11 +54,16 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
|
|||
if (Strings.hasText(System.getProperty("tests.config"))) {
|
||||
settings.loadFromPath(PathUtils.get(System.getProperty("tests.config")));
|
||||
} else {
|
||||
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
|
||||
throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
|
||||
}
|
||||
} catch (SettingsException exception) {
|
||||
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
|
||||
}
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(Ec2DiscoveryPlugin.class);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
|
||||
package org.elasticsearch.discovery.ec2;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.Ec2Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isEmptyString;
|
||||
|
||||
public class Ec2DiscoverySettingsTests extends ESTestCase {
|
||||
|
||||
|
@ -41,4 +44,71 @@ public class Ec2DiscoverySettingsTests extends ESTestCase {
|
|||
assertThat(discoveryReady, is(false));
|
||||
}
|
||||
|
||||
|
||||
private static final Settings AWS = Settings.builder()
|
||||
.put(AwsEc2Service.KEY_SETTING.getKey(), "global-key")
|
||||
.put(AwsEc2Service.SECRET_SETTING.getKey(), "global-secret")
|
||||
.put(AwsEc2Service.PROTOCOL_SETTING.getKey(), "https")
|
||||
.put(AwsEc2Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
|
||||
.put(AwsEc2Service.PROXY_PORT_SETTING.getKey(), 10000)
|
||||
.put(AwsEc2Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
|
||||
.put(AwsEc2Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
|
||||
.put(AwsEc2Service.SIGNER_SETTING.getKey(), "global-signer")
|
||||
.put(AwsEc2Service.REGION_SETTING.getKey(), "global-region")
|
||||
.build();
|
||||
|
||||
private static final Settings EC2 = Settings.builder()
|
||||
.put(AwsEc2Service.CLOUD_EC2.KEY_SETTING.getKey(), "ec2-key")
|
||||
.put(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.getKey(), "ec2-secret")
|
||||
.put(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.getKey(), "http")
|
||||
.put(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.getKey(), "ec2-proxy-host")
|
||||
.put(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.getKey(), 20000)
|
||||
.put(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.getKey(), "ec2-proxy-username")
|
||||
.put(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.getKey(), "ec2-proxy-password")
|
||||
.put(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.getKey(), "ec2-signer")
|
||||
.put(AwsEc2Service.CLOUD_EC2.REGION_SETTING.getKey(), "ec2-region")
|
||||
.put(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.getKey(), "ec2-endpoint")
|
||||
.build();
|
||||
|
||||
/**
|
||||
* We test when only cloud.aws settings are set
|
||||
*/
|
||||
public void testRepositorySettingsGlobalOnly() {
|
||||
Settings nodeSettings = buildSettings(AWS);
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.KEY_SETTING.get(nodeSettings), is("global-key"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.get(nodeSettings), is("global-secret"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.get(nodeSettings), is(Protocol.HTTPS));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.REGION_SETTING.get(nodeSettings), is("global-region"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.get(nodeSettings), isEmptyString());
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when cloud.aws settings are overloaded by cloud.aws.ec2 settings
|
||||
*/
|
||||
public void testRepositorySettingsGlobalOverloadedByEC2() {
|
||||
Settings nodeSettings = buildSettings(AWS, EC2);
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.KEY_SETTING.get(nodeSettings), is("ec2-key"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.SECRET_SETTING.get(nodeSettings), is("ec2-secret"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROTOCOL_SETTING.get(nodeSettings), is(Protocol.HTTP));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_HOST_SETTING.get(nodeSettings), is("ec2-proxy-host"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_USERNAME_SETTING.get(nodeSettings), is("ec2-proxy-username"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.PROXY_PASSWORD_SETTING.get(nodeSettings), is("ec2-proxy-password"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.SIGNER_SETTING.get(nodeSettings), is("ec2-signer"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.REGION_SETTING.get(nodeSettings), is("ec2-region"));
|
||||
assertThat(AwsEc2Service.CLOUD_EC2.ENDPOINT_SETTING.get(nodeSettings), is("ec2-endpoint"));
|
||||
}
|
||||
|
||||
private Settings buildSettings(Settings... global) {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
for (Settings settings : global) {
|
||||
builder.put(settings);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.discovery.ec2;
|
||||
|
||||
import com.amazonaws.services.ec2.model.Tag;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
|
||||
|
@ -95,7 +94,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
public void testPrivateIp() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "private_ip")
|
||||
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "private_ip")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
|
@ -111,7 +110,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
public void testPublicIp() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "public_ip")
|
||||
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "public_ip")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
|
@ -127,7 +126,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
public void testPrivateDns() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "private_dns")
|
||||
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "private_dns")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
|
@ -145,7 +144,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
public void testPublicDns() throws InterruptedException {
|
||||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "public_dns")
|
||||
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "public_dns")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
|
@ -162,7 +161,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
|
||||
public void testInvalidHostType() throws InterruptedException {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.HOST_TYPE, "does_not_exist")
|
||||
.put(DISCOVERY_EC2.HOST_TYPE_SETTING.getKey(), "does_not_exist")
|
||||
.build();
|
||||
try {
|
||||
buildDynamicNodes(nodeSettings, 1);
|
||||
|
@ -175,7 +174,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
public void testFilterByTags() throws InterruptedException {
|
||||
int nodes = randomIntBetween(5, 10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod")
|
||||
.put(DISCOVERY_EC2.TAG_SETTING.getKey() + "stage", "prod")
|
||||
.build();
|
||||
|
||||
int prodInstances = 0;
|
||||
|
@ -200,7 +199,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
public void testFilterByMultipleTags() throws InterruptedException {
|
||||
int nodes = randomIntBetween(5, 10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.putArray(DISCOVERY_EC2.TAG_PREFIX + "stage", "prod", "preprod")
|
||||
.putArray(DISCOVERY_EC2.TAG_SETTING.getKey() + "stage", "prod", "preprod")
|
||||
.build();
|
||||
|
||||
int prodInstances = 0;
|
||||
|
@ -252,7 +251,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
|
||||
public void testGetNodeListCached() throws Exception {
|
||||
Settings.Builder builder = Settings.settingsBuilder()
|
||||
.put(DISCOVERY_EC2.NODE_CACHE_TIME, "500ms");
|
||||
.put(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.getKey(), "500ms");
|
||||
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
|
||||
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) {
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,6 @@ package org.elasticsearch.discovery.ec2;
|
|||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
||||
|
@ -39,8 +38,6 @@ import static org.hamcrest.CoreMatchers.is;
|
|||
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
|
||||
public void testMinimumMasterNodesStart() {
|
||||
Settings nodeSettings = settingsBuilder()
|
||||
.put("plugin.types", Ec2DiscoveryPlugin.class.getName())
|
||||
.put("cloud.enabled", true)
|
||||
.put("discovery.type", "ec2")
|
||||
.build();
|
||||
internalCluster().startNode(nodeSettings);
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
# Elasticsearch plugin descriptor file
|
||||
# This file must exist as 'plugin-descriptor.properties' at
|
||||
# the root directory of all plugins.
|
||||
#
|
||||
### example plugin for "foo"
|
||||
#
|
||||
# foo.zip <-- zip file for the plugin, with this structure:
|
||||
# <arbitrary name1>.jar <-- classes, resources, dependencies
|
||||
# <arbitrary nameN>.jar <-- any number of jars
|
||||
# plugin-descriptor.properties <-- example contents below:
|
||||
#
|
||||
# classname=foo.bar.BazPlugin
|
||||
# description=My cool plugin
|
||||
# version=2.0
|
||||
# elasticsearch.version=2.0
|
||||
# java.version=1.7
|
||||
#
|
||||
### mandatory elements for all plugins:
|
||||
#
|
||||
# 'description': simple summary of the plugin
|
||||
description=The S3 repository plugin adds S3 repositories.
|
||||
#
|
||||
# 'version': plugin's version
|
||||
version=3.0.0-SNAPSHOT
|
||||
#
|
||||
# 'name': the plugin name
|
||||
name=repository-s3
|
||||
#
|
||||
# 'classname': the name of the class to load, fully-qualified.
|
||||
classname=org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin
|
||||
#
|
||||
# 'java.version' version of java the code is built against
|
||||
# use the system property java.specification.version
|
||||
# version string must be a sequence of nonnegative decimal integers
|
||||
# separated by "."'s and may have leading zeros
|
||||
java.version=1.8
|
||||
#
|
||||
# 'elasticsearch.version' version of elasticsearch compiled against
|
||||
elasticsearch.version=3.0.0-SNAPSHOT
|
||||
#
|
||||
### deprecated elements for jvm plugins :
|
||||
#
|
||||
# 'isolated': true if the plugin should have its own classloader.
|
||||
# passing false is deprecated, and only intended to support plugins
|
||||
# that have hard dependencies against each other. If this is
|
||||
# not specified, then the plugin is isolated by default.
|
||||
isolated=true
|
||||
#
|
|
@ -19,59 +19,61 @@
|
|||
|
||||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
|
||||
|
||||
final class CLOUD_AWS {
|
||||
public static final String KEY = "cloud.aws.access_key";
|
||||
public static final String SECRET = "cloud.aws.secret_key";
|
||||
public static final String PROTOCOL = "cloud.aws.protocol";
|
||||
public static final String PROXY_HOST = "cloud.aws.proxy.host";
|
||||
public static final String PROXY_PORT = "cloud.aws.proxy.port";
|
||||
public static final String PROXY_USERNAME = "cloud.aws.proxy.username";
|
||||
public static final String PROXY_PASSWORD = "cloud.aws.proxy.password";
|
||||
public static final String SIGNER = "cloud.aws.signer";
|
||||
public static final String REGION = "cloud.aws.region";
|
||||
// Global AWS settings (shared between discovery-ec2 and repository-s3)
|
||||
// Each setting starting with `cloud.aws` also exists in discovery-ec2 project. Don't forget to update
|
||||
// the code there if you change anything here.
|
||||
Setting<String> KEY_SETTING = Setting.simpleString("cloud.aws.access_key", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SECRET_SETTING = Setting.simpleString("cloud.aws.secret_key", false, Setting.Scope.CLUSTER);
|
||||
Setting<Protocol> PROTOCOL_SETTING =
|
||||
new Setting<>("cloud.aws.protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("cloud.aws.proxy.host", false, Setting.Scope.CLUSTER);
|
||||
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("cloud.aws.proxy.port", 80, 0, 1<<16, false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_USERNAME_SETTING = Setting.simpleString("cloud.aws.proxy.username", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_PASSWORD_SETTING = Setting.simpleString("cloud.aws.proxy.password", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SIGNER_SETTING = Setting.simpleString("cloud.aws.signer", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> REGION_SETTING = new Setting<>("cloud.aws.region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
|
||||
|
||||
// Specific S3 settings
|
||||
interface CLOUD_S3 {
|
||||
Setting<String> KEY_SETTING =
|
||||
new Setting<>("cloud.aws.s3.access_key", AwsS3Service.KEY_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SECRET_SETTING =
|
||||
new Setting<>("cloud.aws.s3.secret_key", AwsS3Service.SECRET_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<Protocol> PROTOCOL_SETTING =
|
||||
new Setting<>("cloud.aws.s3.protocol", AwsS3Service.PROTOCOL_SETTING, s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_HOST_SETTING =
|
||||
new Setting<>("cloud.aws.s3.proxy.host", AwsS3Service.PROXY_HOST_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<Integer> PROXY_PORT_SETTING =
|
||||
new Setting<>("cloud.aws.s3.proxy.port", AwsS3Service.PROXY_PORT_SETTING,
|
||||
s -> Setting.parseInt(s, 0, 1<<16, "cloud.aws.s3.proxy.port"), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_USERNAME_SETTING =
|
||||
new Setting<>("cloud.aws.s3.proxy.username", AwsS3Service.PROXY_USERNAME_SETTING, Function.identity(), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<String> PROXY_PASSWORD_SETTING =
|
||||
new Setting<>("cloud.aws.s3.proxy.password", AwsS3Service.PROXY_PASSWORD_SETTING, Function.identity(), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<String> SIGNER_SETTING =
|
||||
new Setting<>("cloud.aws.s3.signer", AwsS3Service.SIGNER_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> REGION_SETTING =
|
||||
new Setting<>("cloud.aws.s3.region", AwsS3Service.REGION_SETTING, s -> s.toLowerCase(Locale.ROOT), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
Setting<String> ENDPOINT_SETTING =
|
||||
Setting.simpleString("cloud.aws.s3.endpoint", false, Setting.Scope.CLUSTER);
|
||||
}
|
||||
|
||||
final class CLOUD_S3 {
|
||||
public static final String KEY = "cloud.aws.s3.access_key";
|
||||
public static final String SECRET = "cloud.aws.s3.secret_key";
|
||||
public static final String PROTOCOL = "cloud.aws.s3.protocol";
|
||||
public static final String PROXY_HOST = "cloud.aws.s3.proxy.host";
|
||||
public static final String PROXY_PORT = "cloud.aws.s3.proxy.port";
|
||||
public static final String PROXY_USERNAME = "cloud.aws.s3.proxy.username";
|
||||
public static final String PROXY_PASSWORD = "cloud.aws.s3.proxy.password";
|
||||
public static final String SIGNER = "cloud.aws.s3.signer";
|
||||
public static final String ENDPOINT = "cloud.aws.s3.endpoint";
|
||||
}
|
||||
|
||||
final class REPOSITORY_S3 {
|
||||
public static final String BUCKET = "repositories.s3.bucket";
|
||||
public static final String ENDPOINT = "repositories.s3.endpoint";
|
||||
public static final String PROTOCOL = "repositories.s3.protocol";
|
||||
public static final String REGION = "repositories.s3.region";
|
||||
public static final String SERVER_SIDE_ENCRYPTION = "repositories.s3.server_side_encryption";
|
||||
public static final String BUFFER_SIZE = "repositories.s3.buffer_size";
|
||||
public static final String MAX_RETRIES = "repositories.s3.max_retries";
|
||||
public static final String CHUNK_SIZE = "repositories.s3.chunk_size";
|
||||
public static final String COMPRESS = "repositories.s3.compress";
|
||||
public static final String STORAGE_CLASS = "repositories.s3.storage_class";
|
||||
public static final String CANNED_ACL = "repositories.s3.canned_acl";
|
||||
public static final String BASE_PATH = "repositories.s3.base_path";
|
||||
}
|
||||
|
||||
|
||||
|
||||
AmazonS3 client();
|
||||
|
||||
AmazonS3 client(String endpoint, String protocol, String region, String account, String key);
|
||||
|
||||
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
|
||||
AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries);
|
||||
}
|
||||
|
|
|
@ -31,16 +31,14 @@ import com.amazonaws.http.IdleConnectionReaper;
|
|||
import com.amazonaws.internal.StaticCredentialsProvider;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -51,7 +49,7 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
|||
/**
|
||||
* (acceskey, endpoint) -> client
|
||||
*/
|
||||
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();
|
||||
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<>();
|
||||
|
||||
@Inject
|
||||
public InternalAwsS3Service(Settings settings) {
|
||||
|
@ -59,36 +57,23 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized AmazonS3 client() {
|
||||
String endpoint = getDefaultEndpoint();
|
||||
String account = settings.get(CLOUD_S3.KEY, settings.get(CLOUD_AWS.KEY));
|
||||
String key = settings.get(CLOUD_S3.SECRET, settings.get(CLOUD_AWS.SECRET));
|
||||
return getClient(endpoint, null, account, key, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
|
||||
return client(endpoint, protocol, region, account, key, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
|
||||
if (region != null && endpoint == null) {
|
||||
endpoint = getEndpoint(region);
|
||||
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
|
||||
} else if (endpoint == null) {
|
||||
endpoint = getDefaultEndpoint();
|
||||
}
|
||||
if (account == null || key == null) {
|
||||
account = settings.get(CLOUD_S3.KEY, settings.get(CLOUD_AWS.KEY));
|
||||
key = settings.get(CLOUD_S3.SECRET, settings.get(CLOUD_AWS.SECRET));
|
||||
public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
|
||||
if (Strings.isNullOrEmpty(endpoint)) {
|
||||
// We need to set the endpoint based on the region
|
||||
if (region != null) {
|
||||
endpoint = getEndpoint(region);
|
||||
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
|
||||
} else {
|
||||
// No region has been set so we will use the default endpoint
|
||||
endpoint = getDefaultEndpoint();
|
||||
}
|
||||
}
|
||||
|
||||
return getClient(endpoint, protocol, account, key, maxRetries);
|
||||
}
|
||||
|
||||
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) {
|
||||
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
|
||||
private synchronized AmazonS3 getClient(String endpoint, Protocol protocol, String account, String key, Integer maxRetries) {
|
||||
Tuple<String, String> clientDescriptor = new Tuple<>(endpoint, account);
|
||||
AmazonS3Client client = clients.get(clientDescriptor);
|
||||
if (client != null) {
|
||||
return client;
|
||||
|
@ -98,32 +83,13 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
|||
// the response metadata cache is only there for diagnostics purposes,
|
||||
// but can force objects from every response to the old generation.
|
||||
clientConfiguration.setResponseMetadataCacheSize(0);
|
||||
if (protocol == null) {
|
||||
protocol = settings.get(CLOUD_AWS.PROTOCOL, "https").toLowerCase(Locale.ROOT);
|
||||
protocol = settings.get(CLOUD_S3.PROTOCOL, protocol).toLowerCase(Locale.ROOT);
|
||||
}
|
||||
clientConfiguration.setProtocol(protocol);
|
||||
|
||||
if ("http".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTP);
|
||||
} else if ("https".equals(protocol)) {
|
||||
clientConfiguration.setProtocol(Protocol.HTTPS);
|
||||
} else {
|
||||
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
|
||||
}
|
||||
|
||||
String proxyHost = settings.get(CLOUD_AWS.PROXY_HOST);
|
||||
proxyHost = settings.get(CLOUD_S3.PROXY_HOST, proxyHost);
|
||||
if (proxyHost != null) {
|
||||
String portString = settings.get(CLOUD_AWS.PROXY_PORT, "80");
|
||||
portString = settings.get(CLOUD_S3.PROXY_PORT, portString);
|
||||
Integer proxyPort;
|
||||
try {
|
||||
proxyPort = Integer.parseInt(portString, 10);
|
||||
} catch (NumberFormatException ex) {
|
||||
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
|
||||
}
|
||||
String proxyUsername = settings.get(CLOUD_S3.PROXY_USERNAME, settings.get(CLOUD_AWS.PROXY_USERNAME));
|
||||
String proxyPassword = settings.get(CLOUD_S3.PROXY_PASSWORD, settings.get(CLOUD_AWS.PROXY_PASSWORD));
|
||||
String proxyHost = CLOUD_S3.PROXY_HOST_SETTING.get(settings);
|
||||
if (Strings.hasText(proxyHost)) {
|
||||
Integer proxyPort = CLOUD_S3.PROXY_PORT_SETTING.get(settings);
|
||||
String proxyUsername = CLOUD_S3.PROXY_USERNAME_SETTING.get(settings);
|
||||
String proxyPassword = CLOUD_S3.PROXY_PASSWORD_SETTING.get(settings);
|
||||
|
||||
clientConfiguration
|
||||
.withProxyHost(proxyHost)
|
||||
|
@ -138,8 +104,8 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
|||
}
|
||||
|
||||
// #155: we might have 3rd party users using older S3 API version
|
||||
String awsSigner = settings.get(CLOUD_S3.SIGNER, settings.get(CLOUD_AWS.SIGNER));
|
||||
if (awsSigner != null) {
|
||||
String awsSigner = CLOUD_S3.SIGNER_SETTING.get(settings);
|
||||
if (Strings.hasText(awsSigner)) {
|
||||
logger.debug("using AWS API signer [{}]", awsSigner);
|
||||
AwsSigner.configureSigner(awsSigner, clientConfiguration, endpoint);
|
||||
}
|
||||
|
@ -168,11 +134,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
|
|||
|
||||
private String getDefaultEndpoint() {
|
||||
String endpoint = null;
|
||||
if (settings.get(CLOUD_S3.ENDPOINT) != null) {
|
||||
endpoint = settings.get(CLOUD_S3.ENDPOINT);
|
||||
if (CLOUD_S3.ENDPOINT_SETTING.exists(settings)) {
|
||||
endpoint = CLOUD_S3.ENDPOINT_SETTING.get(settings);
|
||||
logger.debug("using explicit s3 endpoint [{}]", endpoint);
|
||||
} else if (settings.get(CLOUD_AWS.REGION) != null) {
|
||||
String region = settings.get(CLOUD_AWS.REGION).toLowerCase(Locale.ROOT);
|
||||
} else if (CLOUD_S3.REGION_SETTING.exists(settings)) {
|
||||
String region = CLOUD_S3.REGION_SETTING.get(settings);
|
||||
endpoint = getEndpoint(region);
|
||||
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
|
||||
}
|
||||
|
|
|
@ -88,14 +88,70 @@ public class S3RepositoryPlugin extends Plugin {
|
|||
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class, BlobStoreIndexShardRepository.class);
|
||||
}
|
||||
|
||||
public void onModule(SettingsModule module) {
|
||||
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.KEY);
|
||||
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.SECRET);
|
||||
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_AWS.PROXY_PASSWORD);
|
||||
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.KEY);
|
||||
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.SECRET);
|
||||
module.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.PROXY_PASSWORD);
|
||||
module.registerSettingsFilter("access_key"); // WTF is this?
|
||||
module.registerSettingsFilter("secret_key"); // WTF is this?
|
||||
public void onModule(SettingsModule settingsModule) {
|
||||
// Register global cloud aws settings: cloud.aws
|
||||
settingsModule.registerSetting(AwsS3Service.KEY_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.SECRET_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.PROTOCOL_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.PROXY_HOST_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.PROXY_PORT_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.PROXY_USERNAME_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.PROXY_PASSWORD_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.SIGNER_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.REGION_SETTING);
|
||||
|
||||
// Register S3 specific settings: cloud.aws.s3
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.KEY_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.SECRET_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.SIGNER_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.REGION_SETTING);
|
||||
settingsModule.registerSetting(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING);
|
||||
|
||||
// Register S3 repositories settings: repositories.s3
|
||||
settingsModule.registerSetting(S3Repository.Repositories.KEY_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.SECRET_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.BUCKET_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.REGION_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.ENDPOINT_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.PROTOCOL_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.BUFFER_SIZE_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.MAX_RETRIES_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.CHUNK_SIZE_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.COMPRESS_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.STORAGE_CLASS_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.CANNED_ACL_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repositories.BASE_PATH_SETTING);
|
||||
|
||||
// Register S3 single repository settings
|
||||
settingsModule.registerSetting(S3Repository.Repository.KEY_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.SECRET_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.BUCKET_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.ENDPOINT_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.PROTOCOL_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.REGION_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.BUFFER_SIZE_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.MAX_RETRIES_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.CHUNK_SIZE_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.COMPRESS_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.STORAGE_CLASS_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.CANNED_ACL_SETTING);
|
||||
settingsModule.registerSetting(S3Repository.Repository.BASE_PATH_SETTING);
|
||||
|
||||
// Filter global settings
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.KEY_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.SECRET_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.PROXY_PASSWORD_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilterIfMissing(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilter(S3Repository.Repository.KEY_SETTING.getKey());
|
||||
settingsModule.registerSettingsFilter(S3Repository.Repository.SECRET_SETTING.getKey());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,14 @@
|
|||
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import org.elasticsearch.cloud.aws.AwsS3Service;
|
||||
import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_AWS;
|
||||
import org.elasticsearch.cloud.aws.AwsS3Service.REPOSITORY_S3;
|
||||
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
|
@ -37,6 +37,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Shared file system implementation of the BlobStoreRepository
|
||||
|
@ -55,6 +56,40 @@ public class S3Repository extends BlobStoreRepository {
|
|||
|
||||
public final static String TYPE = "s3";
|
||||
|
||||
public interface Repositories {
|
||||
Setting<String> KEY_SETTING = new Setting<>("repositories.s3.access_key", AwsS3Service.CLOUD_S3.KEY_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SECRET_SETTING = new Setting<>("repositories.s3.secret_key", AwsS3Service.CLOUD_S3.SECRET_SETTING, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> BUCKET_SETTING = Setting.simpleString("repositories.s3.bucket", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> REGION_SETTING = new Setting<>("repositories.s3.region", AwsS3Service.CLOUD_S3.REGION_SETTING, s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> ENDPOINT_SETTING = new Setting<>("repositories.s3.endpoint", AwsS3Service.CLOUD_S3.ENDPOINT_SETTING, s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
|
||||
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("repositories.s3.protocol", AwsS3Service.CLOUD_S3.PROTOCOL_SETTING, s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
|
||||
Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("repositories.s3.server_side_encryption", false, false, Setting.Scope.CLUSTER);
|
||||
Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("repositories.s3.buffer_size", S3BlobStore.MIN_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
|
||||
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("repositories.s3.max_retries", 3, false, Setting.Scope.CLUSTER);
|
||||
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB), false, Setting.Scope.CLUSTER);
|
||||
Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("repositories.s3.compress", false, false, Setting.Scope.CLUSTER);
|
||||
Setting<String> STORAGE_CLASS_SETTING = Setting.simpleString("repositories.s3.storage_class", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> CANNED_ACL_SETTING = Setting.simpleString("repositories.s3.canned_acl", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> BASE_PATH_SETTING = Setting.simpleString("repositories.s3.base_path", false, Setting.Scope.CLUSTER);
|
||||
}
|
||||
|
||||
public interface Repository {
|
||||
Setting<String> KEY_SETTING = Setting.simpleString("access_key", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> SECRET_SETTING = Setting.simpleString("secret_key", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> BUCKET_SETTING = Setting.simpleString("bucket", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> ENDPOINT_SETTING = Setting.simpleString("endpoint", false, Setting.Scope.CLUSTER);
|
||||
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("protocol", "https", s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), false, Setting.Scope.CLUSTER);
|
||||
Setting<String> REGION_SETTING = new Setting<>("region", "", s -> s.toLowerCase(Locale.ROOT), false, Setting.Scope.CLUSTER);
|
||||
Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("server_side_encryption", false, false, Setting.Scope.CLUSTER);
|
||||
Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", S3BlobStore.MIN_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
|
||||
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("max_retries", 3, false, Setting.Scope.CLUSTER);
|
||||
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("chunk_size", "-1", false, Setting.Scope.CLUSTER);
|
||||
Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, false, Setting.Scope.CLUSTER);
|
||||
Setting<String> STORAGE_CLASS_SETTING = Setting.simpleString("storage_class", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> CANNED_ACL_SETTING = Setting.simpleString("canned_acl", false, Setting.Scope.CLUSTER);
|
||||
Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path", false, Setting.Scope.CLUSTER);
|
||||
}
|
||||
|
||||
private final S3BlobStore blobStore;
|
||||
|
||||
private final BlobPath basePath;
|
||||
|
@ -75,62 +110,40 @@ public class S3Repository extends BlobStoreRepository {
|
|||
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
|
||||
super(name.getName(), repositorySettings, indexShardRepository);
|
||||
|
||||
String bucket = repositorySettings.settings().get("bucket", settings.get(REPOSITORY_S3.BUCKET));
|
||||
String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
|
||||
if (bucket == null) {
|
||||
throw new RepositoryException(name.name(), "No bucket defined for s3 gateway");
|
||||
}
|
||||
|
||||
String endpoint = repositorySettings.settings().get("endpoint", settings.get(REPOSITORY_S3.ENDPOINT));
|
||||
String protocol = repositorySettings.settings().get("protocol", settings.get(REPOSITORY_S3.PROTOCOL));
|
||||
|
||||
String region = repositorySettings.settings().get("region", settings.get(REPOSITORY_S3.REGION));
|
||||
if (region == null) {
|
||||
// InternalBucket setting is not set - use global region setting
|
||||
String regionSetting = settings.get(CLOUD_AWS.REGION);
|
||||
if (regionSetting != null) {
|
||||
regionSetting = regionSetting.toLowerCase(Locale.ENGLISH);
|
||||
if ("us-east".equals(regionSetting) || "us-east-1".equals(regionSetting)) {
|
||||
// Default bucket - setting region to null
|
||||
region = null;
|
||||
} else if ("us-west".equals(regionSetting) || "us-west-1".equals(regionSetting)) {
|
||||
region = "us-west-1";
|
||||
} else if ("us-west-2".equals(regionSetting)) {
|
||||
region = "us-west-2";
|
||||
} else if ("ap-southeast".equals(regionSetting) || "ap-southeast-1".equals(regionSetting)) {
|
||||
region = "ap-southeast-1";
|
||||
} else if ("ap-southeast-2".equals(regionSetting)) {
|
||||
region = "ap-southeast-2";
|
||||
} else if ("ap-northeast".equals(regionSetting) || "ap-northeast-1".equals(regionSetting)) {
|
||||
region = "ap-northeast-1";
|
||||
} else if ("eu-west".equals(regionSetting) || "eu-west-1".equals(regionSetting)) {
|
||||
region = "eu-west-1";
|
||||
} else if ("eu-central".equals(regionSetting) || "eu-central-1".equals(regionSetting)) {
|
||||
region = "eu-central-1";
|
||||
} else if ("sa-east".equals(regionSetting) || "sa-east-1".equals(regionSetting)) {
|
||||
region = "sa-east-1";
|
||||
} else if ("cn-north".equals(regionSetting) || "cn-north-1".equals(regionSetting)) {
|
||||
region = "cn-north-1";
|
||||
}
|
||||
}
|
||||
String endpoint = getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING);
|
||||
Protocol protocol = getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING);
|
||||
String region = getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING);
|
||||
// If no region is defined either in region, repositories.s3.region, cloud.aws.s3.region or cloud.aws.region
|
||||
// we fallback to Default bucket - null
|
||||
if (Strings.isEmpty(region)) {
|
||||
region = null;
|
||||
}
|
||||
|
||||
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", settings.getAsBoolean(REPOSITORY_S3.SERVER_SIDE_ENCRYPTION, false));
|
||||
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", settings.getAsBytesSize(REPOSITORY_S3.BUFFER_SIZE, null));
|
||||
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", settings.getAsInt(REPOSITORY_S3.MAX_RETRIES, 3));
|
||||
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize(REPOSITORY_S3.CHUNK_SIZE, new ByteSizeValue(100, ByteSizeUnit.MB)));
|
||||
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean(REPOSITORY_S3.COMPRESS, false));
|
||||
boolean serverSideEncryption = getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
|
||||
ByteSizeValue bufferSize = getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
|
||||
Integer maxRetries = getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
|
||||
this.chunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
|
||||
this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
|
||||
|
||||
// Parse and validate the user's S3 Storage Class setting
|
||||
String storageClass = repositorySettings.settings().get("storage_class", settings.get(REPOSITORY_S3.STORAGE_CLASS, null));
|
||||
String cannedACL = repositorySettings.settings().get("canned_acl", settings.get(REPOSITORY_S3.CANNED_ACL, null));
|
||||
String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
|
||||
String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
|
||||
|
||||
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]",
|
||||
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
|
||||
|
||||
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries),
|
||||
String key = getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING);
|
||||
String secret = getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING);
|
||||
|
||||
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, key, secret, maxRetries),
|
||||
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
|
||||
|
||||
String basePath = repositorySettings.settings().get("base_path", settings.get(REPOSITORY_S3.BASE_PATH));
|
||||
String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
|
||||
if (Strings.hasLength(basePath)) {
|
||||
BlobPath path = new BlobPath();
|
||||
for(String elem : Strings.splitStringToArray(basePath, '/')) {
|
||||
|
@ -171,4 +184,13 @@ public class S3Repository extends BlobStoreRepository {
|
|||
return chunkSize;
|
||||
}
|
||||
|
||||
public static <T> T getValue(RepositorySettings repositorySettings,
|
||||
Setting<T> repositorySetting,
|
||||
Setting<T> repositoriesSetting) {
|
||||
if (repositorySetting.exists(repositorySettings.settings())) {
|
||||
return repositorySetting.get(repositorySettings.settings());
|
||||
} else {
|
||||
return repositoriesSetting.get(repositorySettings.globalSettings());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,24 @@
|
|||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
||||
public class AWSSignersTests extends ESTestCase {
|
||||
|
||||
/**
|
||||
* Starts S3RepositoryPlugin. It's a workaround when you run test from IntelliJ. Otherwise it generates
|
||||
* java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessDeclaredMembers")
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void instantiatePlugin() {
|
||||
new S3RepositoryPlugin();
|
||||
}
|
||||
|
||||
public void testSigners() {
|
||||
assertThat(signerTester(null), is(false));
|
||||
assertThat(signerTester("QueryStringSignerType"), is(true));
|
||||
|
|
|
@ -25,9 +25,12 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Base class for AWS tests that require credentials.
|
||||
* <p>
|
||||
|
@ -39,10 +42,9 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Settings.Builder settings = Settings.builder()
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
|
||||
.extendArray("plugin.types", S3RepositoryPlugin.class.getName(), TestAwsS3Service.TestPlugin.class.getName())
|
||||
.put("cloud.aws.test.random", randomInt())
|
||||
.put("cloud.aws.test.write_failures", 0.1)
|
||||
.put("cloud.aws.test.read_failures", 0.1);
|
||||
|
@ -52,11 +54,16 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
|
|||
if (Strings.hasText(System.getProperty("tests.config"))) {
|
||||
settings.loadFromPath(PathUtils.get(System.getProperty("tests.config")));
|
||||
} else {
|
||||
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
|
||||
throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
|
||||
}
|
||||
} catch (SettingsException exception) {
|
||||
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
|
||||
}
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(S3RepositoryPlugin.class);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.repositories.RepositorySettings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isEmptyString;
|
||||
|
||||
public class RepositoryS3SettingsTests extends ESTestCase {
|
||||
|
||||
private static final Settings AWS = Settings.builder()
|
||||
.put(AwsS3Service.KEY_SETTING.getKey(), "global-key")
|
||||
.put(AwsS3Service.SECRET_SETTING.getKey(), "global-secret")
|
||||
.put(AwsS3Service.PROTOCOL_SETTING.getKey(), "https")
|
||||
.put(AwsS3Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
|
||||
.put(AwsS3Service.PROXY_PORT_SETTING.getKey(), 10000)
|
||||
.put(AwsS3Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
|
||||
.put(AwsS3Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
|
||||
.put(AwsS3Service.SIGNER_SETTING.getKey(), "global-signer")
|
||||
.put(AwsS3Service.REGION_SETTING.getKey(), "global-region")
|
||||
.build();
|
||||
|
||||
private static final Settings S3 = Settings.builder()
|
||||
.put(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey(), "s3-key")
|
||||
.put(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey(), "s3-secret")
|
||||
.put(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING.getKey(), "http")
|
||||
.put(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.getKey(), "s3-proxy-host")
|
||||
.put(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.getKey(), 20000)
|
||||
.put(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.getKey(), "s3-proxy-username")
|
||||
.put(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey(), "s3-proxy-password")
|
||||
.put(AwsS3Service.CLOUD_S3.SIGNER_SETTING.getKey(), "s3-signer")
|
||||
.put(AwsS3Service.CLOUD_S3.REGION_SETTING.getKey(), "s3-region")
|
||||
.put(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING.getKey(), "s3-endpoint")
|
||||
.build();
|
||||
|
||||
private static final Settings REPOSITORIES = Settings.builder()
|
||||
.put(Repositories.KEY_SETTING.getKey(), "repositories-key")
|
||||
.put(Repositories.SECRET_SETTING.getKey(), "repositories-secret")
|
||||
.put(Repositories.BUCKET_SETTING.getKey(), "repositories-bucket")
|
||||
.put(Repositories.PROTOCOL_SETTING.getKey(), "https")
|
||||
.put(Repositories.REGION_SETTING.getKey(), "repositories-region")
|
||||
.put(Repositories.ENDPOINT_SETTING.getKey(), "repositories-endpoint")
|
||||
.put(Repositories.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
|
||||
.put(Repositories.BUFFER_SIZE_SETTING.getKey(), "6mb")
|
||||
.put(Repositories.MAX_RETRIES_SETTING.getKey(), 4)
|
||||
.put(Repositories.CHUNK_SIZE_SETTING.getKey(), "110mb")
|
||||
.put(Repositories.COMPRESS_SETTING.getKey(), true)
|
||||
.put(Repositories.STORAGE_CLASS_SETTING.getKey(), "repositories-class")
|
||||
.put(Repositories.CANNED_ACL_SETTING.getKey(), "repositories-acl")
|
||||
.put(Repositories.BASE_PATH_SETTING.getKey(), "repositories-basepath")
|
||||
.build();
|
||||
|
||||
private static final Settings REPOSITORY = Settings.builder()
|
||||
.put(Repository.KEY_SETTING.getKey(), "repository-key")
|
||||
.put(Repository.SECRET_SETTING.getKey(), "repository-secret")
|
||||
.put(Repository.BUCKET_SETTING.getKey(), "repository-bucket")
|
||||
.put(Repository.PROTOCOL_SETTING.getKey(), "https")
|
||||
.put(Repository.REGION_SETTING.getKey(), "repository-region")
|
||||
.put(Repository.ENDPOINT_SETTING.getKey(), "repository-endpoint")
|
||||
.put(Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), false)
|
||||
.put(Repository.BUFFER_SIZE_SETTING.getKey(), "7mb")
|
||||
.put(Repository.MAX_RETRIES_SETTING.getKey(), 5)
|
||||
.put(Repository.CHUNK_SIZE_SETTING.getKey(), "120mb")
|
||||
.put(Repository.COMPRESS_SETTING.getKey(), false)
|
||||
.put(Repository.STORAGE_CLASS_SETTING.getKey(), "repository-class")
|
||||
.put(Repository.CANNED_ACL_SETTING.getKey(), "repository-acl")
|
||||
.put(Repository.BASE_PATH_SETTING.getKey(), "repository-basepath")
|
||||
.build();
|
||||
|
||||
/**
|
||||
* We test when only cloud.aws settings are set
|
||||
*/
|
||||
public void testRepositorySettingsGlobalOnly() {
|
||||
Settings nodeSettings = buildSettings(AWS);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("global-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("global-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("global-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), isEmptyString());
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when cloud.aws settings are overloaded by cloud.aws.s3 settings
|
||||
*/
|
||||
public void testRepositorySettingsGlobalOverloadedByS3() {
|
||||
Settings nodeSettings = buildSettings(AWS, S3);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("s3-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("s3-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTP));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("s3-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("s3-endpoint"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when cloud.aws settings are overloaded by repositories.s3 settings
|
||||
*/
|
||||
public void testRepositorySettingsGlobalOverloadedByRepositories() {
|
||||
Settings nodeSettings = buildSettings(AWS, REPOSITORIES);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(true));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
|
||||
is("repositories-class"));
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when cloud.aws.s3 settings are overloaded by repositories.s3 settings
|
||||
*/
|
||||
public void testRepositorySettingsS3OverloadedByRepositories() {
|
||||
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(true));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
|
||||
is("repositories-class"));
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when cloud.aws settings are overloaded by single repository settings
|
||||
*/
|
||||
public void testRepositorySettingsGlobalOverloadedByRepository() {
|
||||
Settings nodeSettings = buildSettings(AWS);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
|
||||
is("repository-class"));
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when cloud.aws.s3 settings are overloaded by single repository settings
|
||||
*/
|
||||
public void testRepositorySettingsS3OverloadedByRepository() {
|
||||
Settings nodeSettings = buildSettings(AWS, S3);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
|
||||
is("repository-class"));
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
|
||||
}
|
||||
|
||||
/**
|
||||
* We test when repositories settings are overloaded by single repository settings
|
||||
*/
|
||||
public void testRepositorySettingsRepositoriesOverloadedByRepository() {
|
||||
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
|
||||
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
|
||||
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
|
||||
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
|
||||
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
|
||||
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
|
||||
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
|
||||
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
|
||||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
|
||||
is("repository-class"));
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
|
||||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
|
||||
}
|
||||
|
||||
private Settings buildSettings(Settings... global) {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
for (Settings settings : global) {
|
||||
builder.put(settings);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
|
@ -18,11 +18,11 @@
|
|||
*/
|
||||
package org.elasticsearch.cloud.aws;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.IdentityHashMap;
|
||||
|
@ -51,17 +51,7 @@ public class TestAwsS3Service extends InternalAwsS3Service {
|
|||
|
||||
|
||||
@Override
|
||||
public synchronized AmazonS3 client() {
|
||||
return cachedWrapper(super.client());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
|
||||
return cachedWrapper(super.client(endpoint, protocol, region, account, key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
|
||||
public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
|
||||
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries));
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
|
@ -32,14 +33,12 @@ import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
|
|||
import org.elasticsearch.cloud.aws.AwsS3Service;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
|
||||
import org.elasticsearch.repositories.RepositoryMissingException;
|
||||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -54,43 +53,43 @@ import static org.hamcrest.Matchers.notNullValue;
|
|||
*/
|
||||
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, transportClientRatio = 0.0)
|
||||
abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase {
|
||||
|
||||
@Override
|
||||
public Settings indexSettings() {
|
||||
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
|
||||
// checksum file to be written twice during restore operation
|
||||
return Settings.builder().put(super.indexSettings())
|
||||
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE_SETTING.getKey(), false)
|
||||
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE_SETTING.getKey(), false)
|
||||
.put("cloud.enabled", true)
|
||||
.put("plugin.types", S3RepositoryPlugin.class.getName())
|
||||
.put("repositories.s3.base_path", basePath)
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
// nodeSettings is called before `wipeBefore()` so we need to define basePath here
|
||||
globalBasePath = "repo-" + randomInt();
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(S3Repository.Repositories.BASE_PATH_SETTING.getKey(), globalBasePath)
|
||||
.build();
|
||||
}
|
||||
|
||||
private String basePath;
|
||||
private String globalBasePath;
|
||||
|
||||
@Before
|
||||
public final void wipeBefore() {
|
||||
wipeRepositories();
|
||||
basePath = "repo-" + randomInt();
|
||||
cleanRepositoryFiles(basePath);
|
||||
cleanRepositoryFiles(globalBasePath);
|
||||
}
|
||||
|
||||
@After
|
||||
public final void wipeAfter() {
|
||||
wipeRepositories();
|
||||
cleanRepositoryFiles(basePath);
|
||||
cleanRepositoryFiles(globalBasePath);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
|
||||
public void testSimpleWorkflow() {
|
||||
Client client = client();
|
||||
Settings.Builder settings = Settings.settingsBuilder()
|
||||
.put("chunk_size", randomIntBetween(1000, 10000));
|
||||
.put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000));
|
||||
|
||||
// We sometime test getting the base_path from node settings using repositories.s3.base_path
|
||||
if (usually()) {
|
||||
settings.put("base_path", basePath);
|
||||
settings.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath);
|
||||
}
|
||||
|
||||
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
|
||||
|
@ -166,9 +165,9 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
.put("chunk_size", randomIntBetween(1000, 10000))
|
||||
.put("server_side_encryption", true)
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
.put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000))
|
||||
.put(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
|
@ -196,11 +195,12 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
Settings settings = internalCluster().getInstance(Settings.class);
|
||||
Settings bucket = settings.getByPrefix("repositories.s3.");
|
||||
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
|
||||
null,
|
||||
null,
|
||||
bucket.get("region", settings.get("repositories.s3.region")),
|
||||
bucket.get("access_key", settings.get("cloud.aws.access_key")),
|
||||
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
|
||||
null,
|
||||
S3Repository.Repositories.PROTOCOL_SETTING.get(settings),
|
||||
S3Repository.Repositories.REGION_SETTING.get(settings),
|
||||
S3Repository.Repositories.KEY_SETTING.get(settings),
|
||||
S3Repository.Repositories.SECRET_SETTING.get(settings),
|
||||
null);
|
||||
|
||||
String bucketName = bucket.get("bucket");
|
||||
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
|
||||
|
@ -260,26 +260,37 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
try {
|
||||
client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
.put("bucket", bucketSettings.get("bucket"))
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
|
||||
).get();
|
||||
fail("repository verification should have raise an exception!");
|
||||
} catch (RepositoryVerificationException e) {
|
||||
}
|
||||
}
|
||||
|
||||
public void testRepositoryWithBasePath() {
|
||||
Client client = client();
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
assertRepositoryIsOperational(client, "test-repo");
|
||||
}
|
||||
|
||||
public void testRepositoryWithCustomCredentials() {
|
||||
Client client = client();
|
||||
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
|
||||
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
.put("region", bucketSettings.get("region"))
|
||||
.put("access_key", bucketSettings.get("access_key"))
|
||||
.put("secret_key", bucketSettings.get("secret_key"))
|
||||
.put("bucket", bucketSettings.get("bucket"))
|
||||
).get();
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
.put(S3Repository.Repository.REGION_SETTING.getKey(), bucketSettings.get("region"))
|
||||
.put(S3Repository.Repository.KEY_SETTING.getKey(), bucketSettings.get("access_key"))
|
||||
.put(S3Repository.Repository.SECRET_SETTING.getKey(), bucketSettings.get("secret_key"))
|
||||
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
assertRepositoryIsOperational(client, "test-repo");
|
||||
|
@ -292,12 +303,12 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
logger.info("--> creating s3 repostoriy with endpoint [{}], bucket[{}] and path [{}]", bucketSettings.get("endpoint"), bucketSettings.get("bucket"), basePath);
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("bucket", bucketSettings.get("bucket"))
|
||||
.put("endpoint", bucketSettings.get("endpoint"))
|
||||
.put("access_key", bucketSettings.get("access_key"))
|
||||
.put("secret_key", bucketSettings.get("secret_key"))
|
||||
.put("base_path", basePath)
|
||||
).get();
|
||||
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
|
||||
.put(S3Repository.Repository.ENDPOINT_SETTING.getKey(), bucketSettings.get("endpoint"))
|
||||
.put(S3Repository.Repository.KEY_SETTING.getKey(), bucketSettings.get("access_key"))
|
||||
.put(S3Repository.Repository.SECRET_SETTING.getKey(), bucketSettings.get("secret_key"))
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
assertRepositoryIsOperational(client, "test-repo");
|
||||
}
|
||||
|
@ -313,8 +324,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
try {
|
||||
client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
.put("bucket", bucketSettings.get("bucket"))
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
|
||||
// Below setting intentionally omitted to assert bucket is not available in default region.
|
||||
// .put("region", privateBucketSettings.get("region"))
|
||||
).get();
|
||||
|
@ -331,10 +342,10 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
.put("bucket", bucketSettings.get("bucket"))
|
||||
.put("region", bucketSettings.get("region"))
|
||||
).get();
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
.put(S3Repository.Repository.BUCKET_SETTING.getKey(), bucketSettings.get("bucket"))
|
||||
.put(S3Repository.Repository.REGION_SETTING.getKey(), bucketSettings.get("region"))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
assertRepositoryIsOperational(client, "test-repo");
|
||||
|
@ -348,7 +359,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
|
@ -369,8 +380,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
logger.info("--> creating s3 repository without any path");
|
||||
PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo")
|
||||
.setType("s3").setSettings(Settings.settingsBuilder()
|
||||
.put("base_path", basePath)
|
||||
).get();
|
||||
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
try {
|
||||
|
@ -454,17 +465,17 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
|
|||
settings.getByPrefix("repositories.s3.external-bucket.")
|
||||
};
|
||||
for (Settings bucket : buckets) {
|
||||
String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint"));
|
||||
String protocol = bucket.get("protocol", settings.get("repositories.s3.protocol"));
|
||||
String region = bucket.get("region", settings.get("repositories.s3.region"));
|
||||
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key"));
|
||||
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key"));
|
||||
String endpoint = bucket.get("endpoint", S3Repository.Repositories.ENDPOINT_SETTING.get(settings));
|
||||
Protocol protocol = S3Repository.Repositories.PROTOCOL_SETTING.get(settings);
|
||||
String region = bucket.get("region", S3Repository.Repositories.REGION_SETTING.get(settings));
|
||||
String accessKey = bucket.get("access_key", S3Repository.Repositories.KEY_SETTING.get(settings));
|
||||
String secretKey = bucket.get("secret_key", S3Repository.Repositories.SECRET_SETTING.get(settings));
|
||||
String bucketName = bucket.get("bucket");
|
||||
|
||||
// We check that settings has been set in elasticsearch.yml integration test file
|
||||
// as described in README
|
||||
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
|
||||
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey);
|
||||
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey, null);
|
||||
try {
|
||||
ObjectListing prevListing = null;
|
||||
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
|
||||
|
|
Loading…
Reference in New Issue