migrate branch for cloud-aws

This commit is contained in:
Simon Willnauer 2015-06-05 13:12:38 +02:00
commit b3088d50a1
34 changed files with 4417 additions and 0 deletions

361
plugins/cloud-aws/README.md Normal file
View File

@ -0,0 +1,361 @@
AWS Cloud Plugin for Elasticsearch
==================================
The Amazon Web Service (AWS) Cloud plugin allows to use [AWS API](https://github.com/aws/aws-sdk-java)
for the unicast discovery mechanism and add S3 repositories.
In order to install the plugin, run:
```sh
bin/plugin install elasticsearch/elasticsearch-cloud-aws/2.5.1
```
You need to install a version matching your Elasticsearch version:
| Elasticsearch | AWS Cloud Plugin | Docs |
|------------------------|-------------------|------------------------------------------------------------------------------------------------------------------------------------|
| master | Build from source | See below |
| es-1.x | Build from source | [2.6.0-SNAPSHOT](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/es-1.x/#version-260-snapshot-for-elasticsearch-1x) |
| es-1.5 | 2.5.1 | [2.5.1](https://github.com/elastic/elasticsearch-cloud-aws/tree/v2.5.1/#version-251-for-elasticsearch-15) |
| es-1.4 | 2.4.2 | [2.4.2](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/v2.4.2/#version-242-for-elasticsearch-14) |
| es-1.3 | 2.3.0 | [2.3.0](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/v2.3.0/#version-230-for-elasticsearch-13) |
| es-1.2 | 2.2.0 | [2.2.0](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/v2.2.0/#aws-cloud-plugin-for-elasticsearch) |
| es-1.1 | 2.1.1 | [2.1.1](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/v2.1.1/#aws-cloud-plugin-for-elasticsearch) |
| es-1.0 | 2.0.0 | [2.0.0](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/v2.0.0/#aws-cloud-plugin-for-elasticsearch) |
| es-0.90 | 1.16.0 | [1.16.0](https://github.com/elasticsearch/elasticsearch-cloud-aws/tree/v1.16.0/#aws-cloud-plugin-for-elasticsearch) |
To build a `SNAPSHOT` version, you need to build it with Maven:
```bash
mvn clean install
plugin --install cloud-aws \
--url file:target/releases/elasticsearch-cloud-aws-X.X.X-SNAPSHOT.zip
```
## Generic Configuration
The plugin will default to using [IAM Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) credentials
for authentication. These can be overridden by, in increasing order of precedence, system properties `aws.accessKeyId` and `aws.secretKey`,
environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_KEY`, or the elasticsearch config using `cloud.aws.access_key` and `cloud.aws.secret_key`:
```
cloud:
aws:
access_key: AKVAIQBF2RECL7FJWGJQ
secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br
```
### Transport security
By default this plugin uses HTTPS for all API calls to AWS endpoints. If you wish to configure HTTP you can set
`cloud.aws.protocol` in the elasticsearch config. You can optionally override this setting per individual service
via: `cloud.aws.ec2.protocol` or `cloud.aws.s3.protocol`.
```
cloud:
aws:
protocol: https
s3:
protocol: http
ec2:
protocol: https
```
In addition, a proxy can be configured with the `proxy_host` and `proxy_port` settings (note that protocol can be `http` or `https`):
```
cloud:
aws:
protocol: https
proxy_host: proxy1.company.com
proxy_port: 8083
```
You can also set different proxies for `ec2` and `s3`:
```
cloud:
aws:
s3:
proxy_host: proxy1.company.com
proxy_port: 8083
ec2:
proxy_host: proxy2.company.com
proxy_port: 8083
```
### Region
The `cloud.aws.region` can be set to a region and will automatically use the relevant settings for both `ec2` and `s3`. The available values are:
* `us-east` (`us-east-1`)
* `us-west` (`us-west-1`)
* `us-west-1`
* `us-west-2`
* `ap-southeast` (`ap-southeast-1`)
* `ap-southeast-1`
* `ap-southeast-2`
* `ap-northeast` (`ap-northeast-1`)
* `eu-west` (`eu-west-1`)
* `eu-central` (`eu-central-1`)
* `sa-east` (`sa-east-1`)
* `cn-north` (`cn-north-1`)
### EC2/S3 Signer API
If you are using a compatible EC2 or S3 service, they might be using an older API to sign the requests.
You can set your compatible signer API using `cloud.aws.signer` (or `cloud.aws.ec2.signer` and `cloud.aws.s3.signer`)
with the right signer to use. Defaults to `AWS4SignerType`.
## EC2 Discovery
ec2 discovery allows to use the ec2 APIs to perform automatic discovery (similar to multicast in non hostile multicast environments). Here is a simple sample configuration:
```
discovery:
type: ec2
```
The ec2 discovery is using the same credentials as the rest of the AWS services provided by this plugin (`repositories`).
See [Generic Configuration](#generic-configuration) for details.
The following are a list of settings (prefixed with `discovery.ec2`) that can further control the discovery:
* `groups`: Either a comma separated list or array based list of (security) groups. Only instances with the provided security groups will be used in the cluster discovery. (NOTE: You could provide either group NAME or group ID.)
* `host_type`: The type of host type to use to communicate with other instances. Can be one of `private_ip`, `public_ip`, `private_dns`, `public_dns`. Defaults to `private_ip`.
* `availability_zones`: Either a comma separated list or array based list of availability zones. Only instances within the provided availability zones will be used in the cluster discovery.
* `any_group`: If set to `false`, will require all security groups to be present for the instance to be used for the discovery. Defaults to `true`.
* `ping_timeout`: How long to wait for existing EC2 nodes to reply during discovery. Defaults to `3s`. If no unit like `ms`, `s` or `m` is specified, milliseconds are used.
### Recommended EC2 Permissions
EC2 discovery requires making a call to the EC2 service. You'll want to setup an IAM policy to allow this. You can create a custom policy via the IAM Management Console. It should look similar to this.
```js
{
"Statement": [
{
"Action": [
"ec2:DescribeInstances"
],
"Effect": "Allow",
"Resource": [
"*"
]
}
],
"Version": "2012-10-17"
}
```
### Filtering by Tags
The ec2 discovery can also filter machines to include in the cluster based on tags (and not just groups). The settings to use include the `discovery.ec2.tag.` prefix. For example, setting `discovery.ec2.tag.stage` to `dev` will only filter instances with a tag key set to `stage`, and a value of `dev`. Several tags set will require all of those tags to be set for the instance to be included.
One practical use for tag filtering is when an ec2 cluster contains many nodes that are not running elasticsearch. In this case (particularly with high `ping_timeout` values) there is a risk that a new node's discovery phase will end before it has found the cluster (which will result in it declaring itself master of a new cluster with the same name - highly undesirable). Tagging elasticsearch ec2 nodes and then filtering by that tag will resolve this issue.
### Automatic Node Attributes
Though not dependent on actually using `ec2` as discovery (but still requires the cloud aws plugin installed), the plugin can automatically add node attributes relating to ec2 (for example, availability zone, that can be used with the awareness allocation feature). In order to enable it, set `cloud.node.auto_attributes` to `true` in the settings.
### Using other EC2 endpoint
If you are using any EC2 api compatible service, you can set the endpoint you want to use by setting `cloud.aws.ec2.endpoint`
to your URL provider.
## S3 Repository
The S3 repository is using S3 to store snapshots. The S3 repository can be created using the following command:
```sh
$ curl -XPUT 'http://localhost:9200/_snapshot/my_s3_repository' -d '{
"type": "s3",
"settings": {
"bucket": "my_bucket_name",
"region": "us-west"
}
}'
```
The following settings are supported:
* `bucket`: The name of the bucket to be used for snapshots. (Mandatory)
* `region`: The region where bucket is located. Defaults to US Standard
* `endpoint`: The endpoint to the S3 API. Defaults to AWS's default S3 endpoint. Note that setting a region overrides the endpoint setting.
* `protocol`: The protocol to use (`http` or `https`). Defaults to value of `cloud.aws.protocol` or `cloud.aws.s3.protocol`.
* `base_path`: Specifies the path within bucket to repository data. Defaults to root directory.
* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`.
* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.
* `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`.
* `buffer_size`: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, the S3 repository will use the [AWS Multipart Upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that positionning a buffer size lower than `5mb` is not allowed since it will prevents the use of the Multipart API and may result in upload errors. Defaults to `5mb`.
* `max_retries`: Number of retries in case of S3 errors. Defaults to `3`.
The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`).
See [Generic Configuration](#generic-configuration) for details.
Multiple S3 repositories can be created. If the buckets require different credentials, then define them as part of the repository settings.
### Recommended S3 Permissions
In order to restrict the Elasticsearch snapshot process to the minimum required resources, we recommend using Amazon IAM in conjunction with pre-existing S3 buckets. Here is an example policy which will allow the snapshot access to an S3 bucket named "snaps.example.com". This may be configured through the AWS IAM console, by creating a Custom Policy, and using a Policy Document similar to this (changing snaps.example.com to your bucket name).
```js
{
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListBucketMultipartUploads",
"s3:ListBucketVersions"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::snaps.example.com"
]
},
{
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::snaps.example.com/*"
]
}
],
"Version": "2012-10-17"
}
```
You may further restrict the permissions by specifying a prefix within the bucket, in this example, named "foo".
```js
{
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListBucketMultipartUploads",
"s3:ListBucketVersions"
],
"Condition": {
"StringLike": {
"s3:prefix": [
"foo/*"
]
}
},
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::snaps.example.com"
]
},
{
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::snaps.example.com/foo/*"
]
}
],
"Version": "2012-10-17"
}
```
The bucket needs to exist to register a repository for snapshots. If you did not create the bucket then the repository registration will fail. If you want elasticsearch to create the bucket instead, you can add the permission to create a specific bucket like this:
```js
{
"Action": [
"s3:CreateBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::snaps.example.com"
]
}
```
### Using other S3 endpoint
If you are using any S3 api compatible service, you can set a global endpoint by setting `cloud.aws.s3.endpoint`
to your URL provider. Note that this setting will be used for all S3 repositories.
Different `endpoint`, `region` and `protocol` settings can be set on a per-repository basis (see [S3 Repository](#s3-repository) section for detail).
## Testing
Integrations tests in this plugin require working AWS configuration and therefore disabled by default. Three buckets and two iam users have to be created. The first iam user needs access to two buckets in different regions and the final bucket is exclusive for the other iam user. To enable tests prepare a config file elasticsearch.yml with the following content:
```
cloud:
aws:
access_key: AKVAIQBF2RECL7FJWGJQ
secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br
repositories:
s3:
bucket: "bucket_name"
region: "us-west-2"
private-bucket:
bucket: <bucket not accessible by default key>
access_key: <access key>
secret_key: <secret key>
remote-bucket:
bucket: <bucket in other region>
region: <region>
external-bucket:
bucket: <bucket>
access_key: <access key>
secret_key: <secret key>
endpoint: <endpoint>
protocol: <protocol>
```
Replace all occurrences of `access_key`, `secret_key`, `endpoint`, `protocol`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified buckets.
To run test:
```sh
mvn -Dtests.aws=true -Dtests.config=/path/to/config/file/elasticsearch.yml clean test
```
License
-------
This software is licensed under the Apache 2 license, quoted below.
Copyright 2009-2014 Elasticsearch <http://www.elasticsearch.org>
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.

48
plugins/cloud-aws/pom.xml Normal file
View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>elasticsearch-cloud-aws</artifactId>
<packaging>jar</packaging>
<name>Elasticsearch AWS cloud plugin</name>
<description>The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.</description>
<parent>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-plugin</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<properties>
<amazonaws.version>1.9.34</amazonaws.version>
<tests.jvms>1</tests.jvms>
</properties>
<dependencies>
<!-- AWS SDK -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ec2</artifactId>
<version>${amazonaws.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${amazonaws.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<excludes>
<exclude>org.elasticsearch:elasticsearch</exclude>
</excludes>
</dependencySet>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import java.util.Locale;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
/**
*
*/
public class AwsEc2Service extends AbstractLifecycleComponent<AwsEc2Service> {
public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";
private AmazonEC2Client client;
@Inject
public AwsEc2Service(Settings settings, SettingsFilter settingsFilter, NetworkService networkService, DiscoveryNodeService discoveryNodeService) {
super(settings);
settingsFilter.addFilter("cloud.key");
settingsFilter.addFilter("cloud.account");
settingsFilter.addFilter("cloud.aws.access_key");
settingsFilter.addFilter("cloud.aws.secret_key");
// add specific ec2 name resolver
networkService.addCustomNameResolver(new Ec2NameResolver(settings));
discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings));
}
public synchronized AmazonEC2 client() {
if (client != null) {
return client;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
// the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0);
String protocol = settings.get("cloud.aws.protocol", "https").toLowerCase(Locale.ROOT);
protocol = settings.get("cloud.aws.ec2.protocol", protocol).toLowerCase(Locale.ROOT);
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = settings.get("cloud.aws.access_key", settings.get("cloud.account"));
String key = settings.get("cloud.aws.secret_key", settings.get("cloud.key"));
String proxyHost = settings.get("cloud.aws.proxy_host");
proxyHost = settings.get("cloud.aws.ec2.proxy_host", proxyHost);
if (proxyHost != null) {
String portString = settings.get("cloud.aws.proxy_port", "80");
portString = settings.get("cloud.aws.ec2.proxy_port", portString);
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
}
// #155: we might have 3rd party users using older EC2 API version
String awsSigner = settings.get("cloud.aws.ec2.signer", settings.get("cloud.aws.signer"));
if (awsSigner != null) {
logger.debug("using AWS API signer [{}]", awsSigner);
try {
AwsSigner.configureSigner(awsSigner, clientConfiguration);
} catch (IllegalArgumentException e) {
logger.warn("wrong signer set for [cloud.aws.ec2.signer] or [cloud.aws.signer]: [{}]", awsSigner);
}
}
AWSCredentialsProvider credentials;
if (account == null && key == null) {
credentials = new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
new InstanceProfileCredentialsProvider()
);
} else {
credentials = new AWSCredentialsProviderChain(
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
);
}
this.client = new AmazonEC2Client(credentials, clientConfiguration);
if (settings.get("cloud.aws.ec2.endpoint") != null) {
String endpoint = settings.get("cloud.aws.ec2.endpoint");
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
client.setEndpoint(endpoint);
} else if (settings.get("cloud.aws.region") != null) {
String region = settings.get("cloud.aws.region").toLowerCase(Locale.ROOT);
String endpoint;
if (region.equals("us-east-1") || region.equals("us-east")) {
endpoint = "ec2.us-east-1.amazonaws.com";
} else if (region.equals("us-west") || region.equals("us-west-1")) {
endpoint = "ec2.us-west-1.amazonaws.com";
} else if (region.equals("us-west-2")) {
endpoint = "ec2.us-west-2.amazonaws.com";
} else if (region.equals("ap-southeast") || region.equals("ap-southeast-1")) {
endpoint = "ec2.ap-southeast-1.amazonaws.com";
} else if (region.equals("ap-southeast-2")) {
endpoint = "ec2.ap-southeast-2.amazonaws.com";
} else if (region.equals("ap-northeast") || region.equals("ap-northeast-1")) {
endpoint = "ec2.ap-northeast-1.amazonaws.com";
} else if (region.equals("eu-west") || region.equals("eu-west-1")) {
endpoint = "ec2.eu-west-1.amazonaws.com";
} else if (region.equals("eu-central") || region.equals("eu-central-1")) {
endpoint = "ec2.eu-central-1.amazonaws.com";
} else if (region.equals("sa-east") || region.equals("sa-east-1")) {
endpoint = "ec2.sa-east-1.amazonaws.com";
} else if (region.equals("cn-north") || region.equals("cn-north-1")) {
endpoint = "ec2.cn-north-1.amazonaws.com.cn";
} else {
throw new IllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
if (endpoint != null) {
logger.debug("using ec2 region [{}], with endpoint [{}]", region, endpoint);
client.setEndpoint(endpoint);
}
}
return this.client;
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
if (client != null) {
client.shutdown();
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public class AwsModule extends AbstractModule {
private final Settings settings;
public static final String S3_SERVICE_TYPE_KEY = "cloud.aws.s3service.type";
public AwsModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(AwsS3Service.class).to(getS3ServiceClass(settings)).asEagerSingleton();
bind(AwsEc2Service.class).asEagerSingleton();
}
public static Class<? extends AwsS3Service> getS3ServiceClass(Settings settings) {
return settings.getAsClass(S3_SERVICE_TYPE_KEY, InternalAwsS3Service.class);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.common.component.LifecycleComponent;
/**
*
*/
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
AmazonS3 client();
AmazonS3 client(String endpoint, String protocol, String region, String account, String key);
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.SignerFactory;
public class AwsSigner {
private AwsSigner() {
}
/**
* Add a AWS API Signer.
* @param signer Signer to use
* @param configuration AWS Client configuration
* @throws IllegalArgumentException if signer does not exist
*/
public static void configureSigner(String signer, ClientConfiguration configuration)
throws IllegalArgumentException {
if (signer == null) {
throw new IllegalArgumentException("[null] signer set");
}
try {
// We check this signer actually exists in AWS SDK
// It throws a IllegalArgumentException if not found
SignerFactory.getSignerByTypeAndService(signer, null);
configuration.setSignerOverride(signer);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("wrong signer set [" + signer + "]");
}
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
*
*/
public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Service> implements AwsS3Service {
/**
* (acceskey, endpoint) -> client
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();
@Inject
public InternalAwsS3Service(Settings settings) {
super(settings);
}
@Override
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = settings.get("cloud.aws.access_key", settings.get("cloud.account"));
String key = settings.get("cloud.aws.secret_key", settings.get("cloud.key"));
return getClient(endpoint, null, account, key, null);
}
@Override
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return client(endpoint, protocol, region, account, key, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
if (region != null && endpoint == null) {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
} else if (endpoint == null) {
endpoint = getDefaultEndpoint();
}
if (account == null || key == null) {
account = settings.get("cloud.aws.access_key", settings.get("cloud.account"));
key = settings.get("cloud.aws.secret_key", settings.get("cloud.key"));
}
return getClient(endpoint, protocol, account, key, maxRetries);
}
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
return client;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
// the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0);
if (protocol == null) {
protocol = settings.get("cloud.aws.protocol", "https").toLowerCase(Locale.ROOT);
protocol = settings.get("cloud.aws.s3.protocol", protocol).toLowerCase(Locale.ROOT);
}
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new IllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String proxyHost = settings.get("cloud.aws.proxy_host");
proxyHost = settings.get("cloud.aws.s3.proxy_host", proxyHost);
if (proxyHost != null) {
String portString = settings.get("cloud.aws.proxy_port", "80");
portString = settings.get("cloud.aws.s3.proxy_port", portString);
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
}
if (maxRetries != null) {
// If not explicitly set, default to 3 with exponential backoff policy
clientConfiguration.setMaxErrorRetry(maxRetries);
}
// #155: we might have 3rd party users using older S3 API version
String awsSigner = settings.get("cloud.aws.s3.signer", settings.get("cloud.aws.signer"));
if (awsSigner != null) {
logger.debug("using AWS API signer [{}]", awsSigner);
try {
AwsSigner.configureSigner(awsSigner, clientConfiguration);
} catch (IllegalArgumentException e) {
logger.warn("wrong signer set for [cloud.aws.s3.signer] or [cloud.aws.signer]: [{}]", awsSigner);
}
}
AWSCredentialsProvider credentials;
if (account == null && key == null) {
credentials = new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
new InstanceProfileCredentialsProvider()
);
} else {
credentials = new AWSCredentialsProviderChain(
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
);
}
client = new AmazonS3Client(credentials, clientConfiguration);
if (endpoint != null) {
client.setEndpoint(endpoint);
}
clients.put(clientDescriptor, client);
return client;
}
private String getDefaultEndpoint() {
String endpoint = null;
if (settings.get("cloud.aws.s3.endpoint") != null) {
endpoint = settings.get("cloud.aws.s3.endpoint");
logger.debug("using explicit s3 endpoint [{}]", endpoint);
} else if (settings.get("cloud.aws.region") != null) {
String region = settings.get("cloud.aws.region").toLowerCase(Locale.ROOT);
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
return endpoint;
}
private static String getEndpoint(String region) {
if ("us-east".equals(region) || "us-east-1".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-west".equals(region) || "us-west-1".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
return "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region) || "ap-southeast-1".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
return "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region) || "ap-northeast-1".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region) || "eu-west-1".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("eu-central".equals(region) || "eu-central-1".equals(region)) {
return "s3.eu-central-1.amazonaws.com";
} else if ("sa-east".equals(region) || "sa-east-1".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else if ("cn-north".equals(region) || "cn-north-1".equals(region)) {
return "s3.cn-north-1.amazonaws.com.cn";
} else {
throw new IllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
for (AmazonS3Client client : clients.values()) {
client.shutdown();
}
// Ensure that IdleConnectionReaper is shutdown
IdleConnectionReaper.shutdown();
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.*;
import com.amazonaws.util.Base64;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
/**
* DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part.
* <p/>
* When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request.
* Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size).
* <p/>
* Quick facts about S3:
* <p/>
* Maximum object size: 5 TB
* Maximum number of parts per upload: 10,000
* Part numbers: 1 to 10,000 (inclusive)
* Part size: 5 MB to 5 GB, last part can be < 5 MB
* <p/>
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html
*/
public class DefaultS3OutputStream extends S3OutputStream {
private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
private static final ESLogger logger = Loggers.getLogger("cloud.aws");
/**
* Multipart Upload API data
*/
private String multipartId;
private int multipartChunks;
private List<PartETag> multiparts;
public DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, numberOfRetries, serverSideEncryption);
}
@Override
public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException {
if (len > MULTIPART_MAX_SIZE.getBytes()) {
throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3");
}
if (!closing) {
if (len < getBufferSize()) {
upload(bytes, off, len);
} else {
if (getFlushCount() == 0) {
initializeMultipart();
}
uploadMultipart(bytes, off, len, false);
}
} else {
if (multipartId != null) {
uploadMultipart(bytes, off, len, true);
completeMultipart();
} else {
upload(bytes, off, len);
}
}
}
/**
* Upload data using a single request.
*
* @param bytes
* @param off
* @param len
* @throws IOException
*/
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
break;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
throw new IOException("Unable to upload object " + getBlobName(), e);
}
}
}
}
}
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length,
boolean serverSideEncryption) throws AmazonS3Exception {
ObjectMetadata md = new ObjectMetadata();
if (serverSideEncryption) {
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
md.setContentLength(length);
InputStream inputStream = is;
// We try to compute a MD5 while reading it
MessageDigest messageDigest;
try {
messageDigest = MessageDigest.getInstance("MD5");
inputStream = new DigestInputStream(is, messageDigest);
} catch (NoSuchAlgorithmException impossible) {
// Every implementation of the Java platform is required to support MD5 (see MessageDigest)
throw new RuntimeException(impossible);
}
PutObjectResult putObjectResult = blobStore.client().putObject(bucketName, blobName, inputStream, md);
String localMd5 = Base64.encodeAsString(messageDigest.digest());
String remoteMd5 = putObjectResult.getContentMd5();
if (!localMd5.equals(remoteMd5)) {
logger.debug("MD5 local [{}], remote [{}] are not equal...", localMd5, remoteMd5);
throw new AmazonS3Exception("MD5 local [" + localMd5 +
"], remote [" + remoteMd5 +
"] are not equal...");
}
}
private void initializeMultipart() {
int retry = 0;
while ((retry <= getNumberOfRetries()) && (multipartId == null)) {
try {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
throw e;
}
}
}
}
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName);
if (serverSideEncryption) {
ObjectMetadata md = new ObjectMetadata();
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
request.setObjectMetadata(md);
}
return blobStore.client().initiateMultipartUpload(request).getUploadId();
}
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
return;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
abortMultipart();
throw e;
}
}
}
}
}
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is,
int length, boolean lastPart) throws AmazonS3Exception {
UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(blobName)
.withUploadId(uploadId)
.withPartNumber(multipartChunks)
.withInputStream(is)
.withPartSize(length)
.withLastPart(lastPart);
UploadPartResult response = blobStore.client().uploadPart(request);
return response.getPartETag();
}
private void completeMultipart() {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
abortMultipart();
throw e;
}
}
}
}
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts)
throws AmazonS3Exception {
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts);
blobStore.client().completeMultipartUpload(request);
}
private void abortMultipart() {
if (multipartId != null) {
try {
doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId);
} finally {
multipartId = null;
}
}
}
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId)
throws AmazonS3Exception {
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
/**
*
*/
public class S3BlobContainer extends AbstractBlobContainer {
protected final S3BlobStore blobStore;
protected final String keyPath;
public S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path);
this.blobStore = blobStore;
String keyPath = path.buildAsString("/");
if (!keyPath.isEmpty()) {
keyPath = keyPath + "/";
}
this.keyPath = keyPath;
}
@Override
public boolean blobExists(String blobName) {
try {
blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
return true;
} catch (AmazonS3Exception e) {
return false;
} catch (Throwable e) {
throw new BlobStoreException("failed to check if blob exists", e);
}
}
@Override
public void deleteBlob(String blobName) throws IOException {
try {
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
} catch (AmazonClientException e) {
throw new IOException("Exception when deleting blob [" + blobName + "]", e);
}
}
@Override
public InputStream openInput(String blobName) throws IOException {
int retry = 0;
while (retry <= blobStore.numberOfRetries()) {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
retry++;
} else {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
}
}
}
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]");
}
@Override
public OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
}
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
list = blobStore.client().listNextBatchOfObjects(prevListing);
} else {
if (blobNamePrefix != null) {
list = blobStore.client().listObjects(blobStore.bucket(), buildKey(blobNamePrefix));
} else {
list = blobStore.client().listObjects(blobStore.bucket(), keyPath);
}
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String name = summary.getKey().substring(keyPath.length());
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
return blobsBuilder.immutableMap();
}
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
try {
CopyObjectRequest request = new CopyObjectRequest(blobStore.bucket(), buildKey(sourceBlobName),
blobStore.bucket(), buildKey(targetBlobName));
if (blobStore.serverSideEncryption()) {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
request.setNewObjectMetadata(objectMetadata);
}
blobStore.client().copyObject(request);
blobStore.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName));
} catch (AmazonS3Exception e){
throw new IOException(e);
}
}
@Override
public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}
protected String buildKey(String blobName) {
return keyPath + blobName;
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.util.ArrayList;
/**
*
*/
public class S3BlobStore extends AbstractComponent implements BlobStore {
public static final ByteSizeValue MIN_BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
private final AmazonS3 client;
private final String bucket;
private final String region;
private final ByteSizeValue bufferSize;
private final boolean serverSideEncryption;
private final int numberOfRetries;
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
ByteSizeValue bufferSize, int maxRetries) {
super(settings);
this.client = client;
this.bucket = bucket;
this.region = region;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = (bufferSize != null) ? bufferSize : MIN_BUFFER_SIZE;
if (this.bufferSize.getBytes() < MIN_BUFFER_SIZE.getBytes()) {
throw new BlobStoreException("Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
}
this.numberOfRetries = maxRetries;
if (!client.doesBucketExist(bucket)) {
if (region != null) {
client.createBucket(bucket, region);
} else {
client.createBucket(bucket);
}
}
}
@Override
public String toString() {
return (region == null ? "" : region + "/") + bucket;
}
public AmazonS3 client() {
return client;
}
public String bucket() {
return bucket;
}
public boolean serverSideEncryption() { return serverSideEncryption; }
public int bufferSizeInBytes() {
return bufferSize.bytesAsInt();
}
public int numberOfRetries() {
return numberOfRetries;
}
@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
}
@Override
public void delete(BlobPath path) {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
//We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
ArrayList<KeyVersion> keys = new ArrayList<KeyVersion>();
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
String keyPath = path.buildAsString("/");
if (!keyPath.isEmpty()) {
keyPath = keyPath + "/";
}
list = client.listObjects(bucket, keyPath);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new KeyVersion(summary.getKey()));
//Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
}
}
protected boolean shouldRetry(AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
AmazonS3Exception s3e = (AmazonS3Exception)e;
if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) {
return true;
}
}
return e.isRetryable();
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.io.OutputStream;
/**
* S3OutputStream buffers data before flushing it to an underlying S3OutputStream.
*/
public abstract class S3OutputStream extends OutputStream {
/**
* Limit of upload allowed by AWS S3.
*/
protected static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
protected static final ByteSizeValue MULTIPART_MIN_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
private S3BlobStore blobStore;
private String bucketName;
private String blobName;
private int numberOfRetries;
private boolean serverSideEncryption;
private byte[] buffer;
private int count;
private long length;
private int flushCount = 0;
public S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
this.blobStore = blobStore;
this.bucketName = bucketName;
this.blobName = blobName;
this.numberOfRetries = numberOfRetries;
this.serverSideEncryption = serverSideEncryption;
if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) {
throw new IllegalArgumentException("Buffer size can't be smaller than " + MULTIPART_MIN_SIZE);
}
if (bufferSizeInBytes > MULTIPART_MAX_SIZE.getBytes()) {
throw new IllegalArgumentException("Buffer size can't be larger than " + MULTIPART_MAX_SIZE);
}
this.buffer = new byte[bufferSizeInBytes];
}
public abstract void flush(byte[] bytes, int off, int len, boolean closing) throws IOException;
private void flushBuffer(boolean closing) throws IOException {
flush(buffer, 0, count, closing);
flushCount++;
count = 0;
}
@Override
public void write(int b) throws IOException {
if (count >= buffer.length) {
flushBuffer(false);
}
buffer[count++] = (byte) b;
length++;
}
@Override
public void close() throws IOException {
if (count > 0) {
flushBuffer(true);
count = 0;
}
}
public S3BlobStore getBlobStore() {
return blobStore;
}
public String getBucketName() {
return bucketName;
}
public String getBlobName() {
return blobName;
}
public int getBufferSize() {
return buffer.length;
}
public int getNumberOfRetries() {
return numberOfRetries;
}
public boolean isServerSideEncryption() {
return serverSideEncryption;
}
public long getLength() {
return length;
}
public int getFlushCount() {
return flushCount;
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.network;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.NetworkService.CustomNameResolver;
import org.elasticsearch.common.settings.Settings;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
/**
* Resolves certain ec2 related 'meta' hostnames into an actual hostname
* obtained from ec2 meta-data.
* <p/>
* Valid config values for {@link Ec2HostnameType}s are -
* <ul>
* <li>_ec2_ - maps to privateIpv4</li>
* <li>_ec2:privateIp_ - maps to privateIpv4</li>
* <li>_ec2:privateIpv4_</li>
* <li>_ec2:privateDns_</li>
* <li>_ec2:publicIp_ - maps to publicIpv4</li>
* <li>_ec2:publicIpv4_</li>
* <li>_ec2:publicDns_</li>
* </ul>
*
* @author Paul_Loy (keteracel)
*/
public class Ec2NameResolver extends AbstractComponent implements CustomNameResolver {
/**
* enum that can be added to over time with more meta-data types (such as ipv6 when this is available)
*
* @author Paul_Loy
*/
private static enum Ec2HostnameType {
PRIVATE_IPv4("ec2:privateIpv4", "local-ipv4"),
PRIVATE_DNS("ec2:privateDns", "local-hostname"),
PUBLIC_IPv4("ec2:publicIpv4", "public-ipv4"),
PUBLIC_DNS("ec2:publicDns", "public-hostname"),
// some less verbose defaults
PUBLIC_IP("ec2:publicIp", PUBLIC_IPv4.ec2Name),
PRIVATE_IP("ec2:privateIp", PRIVATE_IPv4.ec2Name),
EC2("ec2", PRIVATE_IPv4.ec2Name);
final String configName;
final String ec2Name;
private Ec2HostnameType(String configName, String ec2Name) {
this.configName = configName;
this.ec2Name = ec2Name;
}
}
/**
* Construct a {@link CustomNameResolver}.
*/
public Ec2NameResolver(Settings settings) {
super(settings);
}
/**
* @param type the ec2 hostname type to discover.
* @return the appropriate host resolved from ec2 meta-data.
* @throws IOException if ec2 meta-data cannot be obtained.
* @see CustomNameResolver#resolveIfPossible(String)
*/
public InetAddress resolve(Ec2HostnameType type, boolean warnOnFailure) {
URLConnection urlConnection = null;
InputStream in = null;
try {
URL url = new URL(AwsEc2Service.EC2_METADATA_URL + type.ec2Name);
logger.debug("obtaining ec2 hostname from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection.setConnectTimeout(2000);
in = urlConnection.getInputStream();
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String metadataResult = urlReader.readLine();
if (metadataResult == null || metadataResult.length() == 0) {
logger.error("no ec2 metadata returned from {}", url);
return null;
}
return InetAddress.getByName(metadataResult);
} catch (IOException e) {
if (warnOnFailure) {
logger.warn("failed to get metadata for [" + type.configName + "]: " + ExceptionsHelper.detailedMessage(e));
} else {
logger.debug("failed to get metadata for [" + type.configName + "]: " + ExceptionsHelper.detailedMessage(e));
}
return null;
} finally {
IOUtils.closeWhileHandlingException(in);
}
}
@Override
public InetAddress resolveDefault() {
return null; // using this, one has to explicitly specify _ec2_ in network setting
// return resolve(Ec2HostnameType.DEFAULT, false);
}
@Override
public InetAddress resolveIfPossible(String value) {
for (Ec2HostnameType type : Ec2HostnameType.values()) {
if (type.configName.equals(value)) {
return resolve(type, true);
}
}
return null;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.node;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class Ec2CustomNodeAttributes extends AbstractComponent implements DiscoveryNodeService.CustomAttributesProvider {
public Ec2CustomNodeAttributes(Settings settings) {
super(settings);
}
@Override
public Map<String, String> buildAttributes() {
if (!settings.getAsBoolean("cloud.node.auto_attributes", false)) {
return null;
}
Map<String, String> ec2Attributes = new HashMap<>();
URLConnection urlConnection;
InputStream in = null;
try {
URL url = new URL(AwsEc2Service.EC2_METADATA_URL + "placement/availability-zone");
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection.setConnectTimeout(2000);
in = urlConnection.getInputStream();
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String metadataResult = urlReader.readLine();
if (metadataResult == null || metadataResult.length() == 0) {
logger.error("no ec2 metadata returned from {}", url);
return null;
}
ec2Attributes.put("aws_availability_zone", metadataResult);
} catch (IOException e) {
logger.debug("failed to get metadata for [placement/availability-zone]: " + ExceptionsHelper.detailedMessage(e));
} finally {
IOUtils.closeWhileHandlingException(in);
}
return ec2Attributes;
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.*;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.transport.TransportService;
import java.util.*;
/**
*
*/
public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
private static enum HostType {
PRIVATE_IP,
PUBLIC_IP,
PRIVATE_DNS,
PUBLIC_DNS
}
private final TransportService transportService;
private final AmazonEC2 client;
private final Version version;
private final boolean bindAnyGroup;
private final Set<String> groups;
private final Map<String, String> tags;
private final Set<String> availabilityZones;
private final HostType hostType;
@Inject
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service, Version version) {
super(settings);
this.transportService = transportService;
this.client = awsEc2Service.client();
this.version = version;
this.hostType = HostType.valueOf(settings.get("discovery.ec2.host_type", "private_ip").toUpperCase(Locale.ROOT));
this.bindAnyGroup = settings.getAsBoolean("discovery.ec2.any_group", true);
this.groups = new HashSet<>();
groups.addAll(Arrays.asList(settings.getAsArray("discovery.ec2.groups")));
this.tags = settings.getByPrefix("discovery.ec2.tag.").getAsMap();
Set<String> availabilityZones = new HashSet();
availabilityZones.addAll(Arrays.asList(settings.getAsArray("discovery.ec2.availability_zones")));
if (settings.get("discovery.ec2.availability_zones") != null) {
availabilityZones.addAll(Strings.commaDelimitedListToSet(settings.get("discovery.ec2.availability_zones")));
}
this.availabilityZones = availabilityZones;
if (logger.isDebugEnabled()) {
logger.debug("using host_type [{}], tags [{}], groups [{}] with any_group [{}], availability_zones [{}]", hostType, tags, groups, bindAnyGroup, availabilityZones);
}
}
@Override
public List<DiscoveryNode> buildDynamicNodes() {
List<DiscoveryNode> discoNodes = new ArrayList<>();
DescribeInstancesResult descInstances;
try {
// Query EC2 API based on AZ, instance state, and tag.
// NOTE: we don't filter by security group during the describe instances request for two reasons:
// 1. differences in VPCs require different parameters during query (ID vs Name)
// 2. We want to use two different strategies: (all security groups vs. any security groups)
descInstances = client.describeInstances(buildDescribeInstancesRequest());
} catch (AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);
return discoNodes;
}
logger.trace("building dynamic unicast discovery nodes...");
for (Reservation reservation : descInstances.getReservations()) {
for (Instance instance : reservation.getInstances()) {
// lets see if we can filter based on groups
if (!groups.isEmpty()) {
List<GroupIdentifier> instanceSecurityGroups = instance.getSecurityGroups();
ArrayList<String> securityGroupNames = new ArrayList<String>();
ArrayList<String> securityGroupIds = new ArrayList<String>();
for (GroupIdentifier sg : instanceSecurityGroups) {
securityGroupNames.add(sg.getGroupName());
securityGroupIds.add(sg.getGroupId());
}
if (bindAnyGroup) {
// We check if we can find at least one group name or one group id in groups.
if (Collections.disjoint(securityGroupNames, groups)
&& Collections.disjoint(securityGroupIds, groups)) {
logger.trace("filtering out instance {} based on groups {}, not part of {}", instance.getInstanceId(), instanceSecurityGroups, groups);
// continue to the next instance
continue;
}
} else {
// We need tp match all group names or group ids, otherwise we ignore this instance
if (!(securityGroupNames.containsAll(groups) || securityGroupIds.containsAll(groups))) {
logger.trace("filtering out instance {} based on groups {}, does not include all of {}", instance.getInstanceId(), instanceSecurityGroups, groups);
// continue to the next instance
continue;
}
}
}
String address = null;
switch (hostType) {
case PRIVATE_DNS:
address = instance.getPrivateDnsName();
break;
case PRIVATE_IP:
address = instance.getPrivateIpAddress();
break;
case PUBLIC_DNS:
address = instance.getPublicDnsName();
break;
case PUBLIC_IP:
address = instance.getPublicIpAddress();
break;
}
if (address != null) {
try {
TransportAddress[] addresses = transportService.addressesFromString(address);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], version.minimumCompatibilityVersion()));
}
} catch (Exception e) {
logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address);
}
} else {
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
}
}
}
logger.debug("using dynamic discovery nodes {}", discoNodes);
return discoNodes;
}
private DescribeInstancesRequest buildDescribeInstancesRequest() {
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
.withFilters(
new Filter("instance-state-name").withValues("running", "pending")
);
for (Map.Entry<String, String> tagFilter : tags.entrySet()) {
// for a given tag key, OR relationship for multiple different values
describeInstancesRequest.withFilters(
new Filter("tag:" + tagFilter.getKey()).withValues(tagFilter.getValue())
);
}
if (!availabilityZones.isEmpty()) {
// OR relationship amongst multiple values of the availability-zone filter
describeInstancesRequest.withFilters(
new Filter("availability-zone").withValues(availabilityZones)
);
}
return describeInstancesRequest;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class Ec2Discovery extends ZenDiscovery {
@Inject
public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService,
DiscoverySettings discoverySettings,
ElectMasterService electMasterService, @ClusterDynamicSettings DynamicSettings dynamicSettings) {
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
pingService, electMasterService, discoverySettings, dynamicSettings);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ZenDiscoveryModule;
/**
*
*/
public class Ec2DiscoveryModule extends ZenDiscoveryModule {
@Inject
public Ec2DiscoveryModule(Settings settings) {
if (settings.getAsBoolean("cloud.enabled", true)) {
addUnicastHostProvider(AwsEc2UnicastHostsProvider.class);
}
}
@Override
protected void bindDiscovery() {
bind(Discovery.class).to(Ec2Discovery.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.cloud.aws;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsModule;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.s3.S3Repository;
import org.elasticsearch.repositories.s3.S3RepositoryModule;
import java.util.ArrayList;
import java.util.Collection;
/**
*
*/
public class CloudAwsPlugin extends AbstractPlugin {
private final Settings settings;
public CloudAwsPlugin(Settings settings) {
this.settings = settings;
}
@Override
public String name() {
return "cloud-aws";
}
@Override
public String description() {
return "Cloud AWS Plugin";
}
@Override
public Collection<Module> modules(Settings settings) {
Collection<Module> modules = new ArrayList<>();
if (settings.getAsBoolean("cloud.enabled", true)) {
modules.add(new AwsModule(settings));
}
return modules;
}
@Override
public Collection<Class<? extends LifecycleComponent>> services() {
Collection<Class<? extends LifecycleComponent>> services = new ArrayList<>();
if (settings.getAsBoolean("cloud.enabled", true)) {
services.add(AwsModule.getS3ServiceClass(settings));
services.add(AwsEc2Service.class);
}
return services;
}
public void onModule(RepositoriesModule repositoriesModule) {
if (settings.getAsBoolean("cloud.enabled", true)) {
repositoriesModule.registerRepository(S3Repository.TYPE, S3RepositoryModule.class);
}
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.util.Locale;
/**
* Shared file system implementation of the BlobStoreRepository
* <p/>
* Shared file system repository supports the following settings
* <dl>
* <dt>{@code bucket}</dt><dd>S3 bucket</dd>
* <dt>{@code region}</dt><dd>S3 region. Defaults to us-east</dd>
* <dt>{@code base_path}</dt><dd>Specifies the path within bucket to repository data. Defaults to root directory.</dd>
* <dt>{@code concurrent_streams}</dt><dd>Number of concurrent read/write stream (per repository on each node). Defaults to 5.</dd>
* <dt>{@code chunk_size}</dt><dd>Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to not chucked.</dd>
* <dt>{@code compress}</dt><dd>If set to true metadata files will be stored compressed. Defaults to false.</dd>
* </dl>
*/
public class S3Repository extends BlobStoreRepository {
public final static String TYPE = "s3";
private final S3BlobStore blobStore;
private final BlobPath basePath;
private ByteSizeValue chunkSize;
private boolean compress;
/**
* Constructs new shared file system repository
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository index shard repository
* @param s3Service S3 service
* @throws IOException
*/
@Inject
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
String bucket = repositorySettings.settings().get("bucket", settings.get("repositories.s3.bucket"));
if (bucket == null) {
throw new RepositoryException(name.name(), "No bucket defined for s3 gateway");
}
String endpoint = repositorySettings.settings().get("endpoint", settings.get("repositories.s3.endpoint"));
String protocol = repositorySettings.settings().get("protocol", settings.get("repositories.s3.protocol"));
String region = repositorySettings.settings().get("region", settings.get("repositories.s3.region"));
if (region == null) {
// Bucket setting is not set - use global region setting
String regionSetting = repositorySettings.settings().get("cloud.aws.region", settings.get("cloud.aws.region"));
if (regionSetting != null) {
regionSetting = regionSetting.toLowerCase(Locale.ENGLISH);
if ("us-east".equals(regionSetting) || "us-east-1".equals(regionSetting)) {
// Default bucket - setting region to null
region = null;
} else if ("us-west".equals(regionSetting) || "us-west-1".equals(regionSetting)) {
region = "us-west-1";
} else if ("us-west-2".equals(regionSetting)) {
region = "us-west-2";
} else if ("ap-southeast".equals(regionSetting) || "ap-southeast-1".equals(regionSetting)) {
region = "ap-southeast-1";
} else if ("ap-southeast-2".equals(regionSetting)) {
region = "ap-southeast-2";
} else if ("ap-northeast".equals(regionSetting) || "ap-northeast-1".equals(regionSetting)) {
region = "ap-northeast-1";
} else if ("eu-west".equals(regionSetting) || "eu-west-1".equals(regionSetting)) {
region = "eu-west-1";
} else if ("eu-central".equals(regionSetting) || "eu-central-1".equals(regionSetting)) {
region = "eu-central-1";
} else if ("sa-east".equals(regionSetting) || "sa-east-1".equals(regionSetting)) {
region = "sa-east-1";
} else if ("cn-north".equals(regionSetting) || "cn-north-1".equals(regionSetting)) {
region = "cn-north-1";
}
}
}
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", settings.getAsBoolean("repositories.s3.server_side_encryption", false));
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", settings.getAsBytesSize("repositories.s3.buffer_size", null));
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", settings.getAsInt("repositories.s3.max_retries", 3));
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("repositories.s3.compress", false));
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
String basePath = repositorySettings.settings().get("base_path", null);
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();
for(String elem : Strings.splitStringToArray(basePath, '/')) {
path = path.add(elem);
}
this.basePath = path;
} else {
this.basePath = BlobPath.cleanPath();
}
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
return blobStore;
}
@Override
protected BlobPath basePath() {
return basePath;
}
/**
* {@inheritDoc}
*/
@Override
protected boolean isCompress() {
return compress;
}
/**
* {@inheritDoc}
*/
@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.Repository;
/**
* S3 repository module
*/
public class S3RepositoryModule extends AbstractModule {
public S3RepositoryModule() {
super();
}
/**
* {@inheritDoc}
*/
@Override
protected void configure() {
bind(Repository.class).to(S3Repository.class).asEagerSingleton();
bind(IndexShardRepository.class).to(BlobStoreIndexShardRepository.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,3 @@
plugin=org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin
version=${project.version}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
public class AWSSignersTest extends ElasticsearchTestCase {
@Test
public void testSigners() {
assertThat(signerTester(null), is(false));
assertThat(signerTester("QueryStringSignerType"), is(true));
assertThat(signerTester("AWS3SignerType"), is(true));
assertThat(signerTester("AWS4SignerType"), is(true));
assertThat(signerTester("NoOpSignerType"), is(true));
assertThat(signerTester("UndefinedSigner"), is(false));
}
/**
* Test a signer configuration
* @param signer signer name
* @return true if successful, false otherwise
*/
private boolean signerTester(String signer) {
try {
AwsSigner.configureSigner(signer, new ClientConfiguration());
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.FailedToResolveConfigException;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ThirdParty;
import org.junit.After;
import org.junit.Before;
import java.lang.annotation.Documented;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.HashMap;
import java.util.Map;
/**
* Base class for AWS tests that require credentials.
* <p>
* You must specify {@code -Dtests.thirdparty=true -Dtests.config=/path/to/config}
* in order to run these tests.
*/
@ThirdParty
public abstract class AbstractAwsTest extends ElasticsearchIntegrationTest {
/**
* Those properties are set by the AWS SDK v1.9.4 and if not ignored,
* lead to tests failure (see AbstractRandomizedTest#IGNORED_INVARIANT_PROPERTIES)
*/
private static final String[] AWS_INVARIANT_PROPERTIES = {
"com.sun.org.apache.xml.internal.dtm.DTMManager",
"javax.xml.parsers.DocumentBuilderFactory"
};
private Map<String, String> properties = new HashMap<>();
@Before
public void saveProperties() {
for (String p : AWS_INVARIANT_PROPERTIES) {
properties.put(p, System.getProperty(p));
}
}
@After
public void restoreProperties() {
for (String p : AWS_INVARIANT_PROPERTIES) {
if (properties.get(p) != null) {
System.setProperty(p, properties.get(p));
} else {
System.clearProperty(p);
}
}
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("path.home", createTempDir())
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
.put(AwsModule.S3_SERVICE_TYPE_KEY, TestAwsS3Service.class)
.put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1)
.put("cloud.aws.test.read_failures", 0.1);
Environment environment = new Environment(settings.build());
// if explicit, just load it and don't load from env
try {
if (Strings.hasText(System.getProperty("tests.config"))) {
settings.loadFromUrl(environment.resolveConfig(System.getProperty("tests.config")));
} else {
throw new IllegalStateException("to run integration tests, you need to set -Dtest.thirdparty=true and -Dtests.config=/path/to/elasticsearch.yml");
}
} catch (FailedToResolveConfigException exception) {
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception);
}
return settings.build();
}
}

View File

@ -0,0 +1,582 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.HttpMethod;
import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.S3ResponseMetadata;
import com.amazonaws.services.s3.model.*;
import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.util.Date;
import java.util.List;
import org.elasticsearch.common.SuppressForbidden;
/**
*
*/
@SuppressForbidden(reason = "implements AWS api that uses java.io.File!")
public class AmazonS3Wrapper implements AmazonS3 {
protected AmazonS3 delegate;
public AmazonS3Wrapper(AmazonS3 delegate) {
this.delegate = delegate;
}
@Override
public void setEndpoint(String endpoint) {
delegate.setEndpoint(endpoint);
}
@Override
public void setRegion(Region region) throws IllegalArgumentException {
delegate.setRegion(region);
}
@Override
public void setS3ClientOptions(S3ClientOptions clientOptions) {
delegate.setS3ClientOptions(clientOptions);
}
@Override
public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass) throws AmazonClientException, AmazonServiceException {
delegate.changeObjectStorageClass(bucketName, key, newStorageClass);
}
@Override
public void setObjectRedirectLocation(String bucketName, String key, String newRedirectLocation) throws AmazonClientException, AmazonServiceException {
delegate.setObjectRedirectLocation(bucketName, key, newRedirectLocation);
}
@Override
public ObjectListing listObjects(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.listObjects(bucketName);
}
@Override
public ObjectListing listObjects(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
return delegate.listObjects(bucketName, prefix);
}
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.listObjects(listObjectsRequest);
}
@Override
public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing) throws AmazonClientException, AmazonServiceException {
return delegate.listNextBatchOfObjects(previousObjectListing);
}
@Override
public VersionListing listVersions(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
return delegate.listVersions(bucketName, prefix);
}
@Override
public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing) throws AmazonClientException, AmazonServiceException {
return delegate.listNextBatchOfVersions(previousVersionListing);
}
@Override
public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker, String delimiter, Integer maxResults) throws AmazonClientException, AmazonServiceException {
return delegate.listVersions(bucketName, prefix, keyMarker, versionIdMarker, delimiter, maxResults);
}
@Override
public VersionListing listVersions(ListVersionsRequest listVersionsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.listVersions(listVersionsRequest);
}
@Override
public Owner getS3AccountOwner() throws AmazonClientException, AmazonServiceException {
return delegate.getS3AccountOwner();
}
@Override
public boolean doesBucketExist(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.doesBucketExist(bucketName);
}
@Override
public List<Bucket> listBuckets() throws AmazonClientException, AmazonServiceException {
return delegate.listBuckets();
}
@Override
public List<Bucket> listBuckets(ListBucketsRequest listBucketsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.listBuckets(listBucketsRequest);
}
@Override
public String getBucketLocation(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketLocation(bucketName);
}
@Override
public String getBucketLocation(GetBucketLocationRequest getBucketLocationRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketLocation(getBucketLocationRequest);
}
@Override
public Bucket createBucket(CreateBucketRequest createBucketRequest) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(createBucketRequest);
}
@Override
public Bucket createBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(bucketName);
}
@Override
public Bucket createBucket(String bucketName, com.amazonaws.services.s3.model.Region region) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(bucketName, region);
}
@Override
public Bucket createBucket(String bucketName, String region) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(bucketName, region);
}
@Override
public AccessControlList getObjectAcl(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectAcl(bucketName, key);
}
@Override
public AccessControlList getObjectAcl(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectAcl(bucketName, key, versionId);
}
@Override
public void setObjectAcl(String bucketName, String key, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, acl);
}
@Override
public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, acl);
}
@Override
public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, versionId, acl);
}
@Override
public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, versionId, acl);
}
@Override
public void setObjectAcl(SetObjectAclRequest setObjectAclRequest) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(setObjectAclRequest);
}
@Override
public AccessControlList getBucketAcl(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketAcl(bucketName);
}
@Override
public void setBucketAcl(SetBucketAclRequest setBucketAclRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketAcl(setBucketAclRequest);
}
@Override
public AccessControlList getBucketAcl(GetBucketAclRequest getBucketAclRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketAcl(getBucketAclRequest);
}
@Override
public void setBucketAcl(String bucketName, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setBucketAcl(bucketName, acl);
}
@Override
public void setBucketAcl(String bucketName, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setBucketAcl(bucketName, acl);
}
@Override
public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectMetadata(bucketName, key);
}
@Override
public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectMetadata(getObjectMetadataRequest);
}
@Override
public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
return delegate.getObject(bucketName, key);
}
@Override
public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getObject(getObjectRequest);
}
@Override
public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile) throws AmazonClientException, AmazonServiceException {
return delegate.getObject(getObjectRequest, destinationFile);
}
@Override
public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucket(deleteBucketRequest);
}
@Override
public void deleteBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucket(bucketName);
}
@Override
public void setBucketReplicationConfiguration(String bucketName, BucketReplicationConfiguration configuration) throws AmazonServiceException, AmazonClientException {
delegate.setBucketReplicationConfiguration(bucketName, configuration);
}
@Override
public void setBucketReplicationConfiguration(SetBucketReplicationConfigurationRequest setBucketReplicationConfigurationRequest) throws AmazonServiceException, AmazonClientException {
delegate.setBucketReplicationConfiguration(setBucketReplicationConfigurationRequest);
}
@Override
public BucketReplicationConfiguration getBucketReplicationConfiguration(String bucketName) throws AmazonServiceException, AmazonClientException {
return delegate.getBucketReplicationConfiguration(bucketName);
}
@Override
public void deleteBucketReplicationConfiguration(String bucketName) throws AmazonServiceException, AmazonClientException {
delegate.deleteBucketReplicationConfiguration(bucketName);
}
@Override
public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(putObjectRequest);
}
@Override
public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(bucketName, key, file);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(bucketName, key, input, metadata);
}
@Override
public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName, String destinationKey) throws AmazonClientException, AmazonServiceException {
return delegate.copyObject(sourceBucketName, sourceKey, destinationBucketName, destinationKey);
}
@Override
public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.copyObject(copyObjectRequest);
}
@Override
public CopyPartResult copyPart(CopyPartRequest copyPartRequest) throws AmazonClientException, AmazonServiceException {
return delegate.copyPart(copyPartRequest);
}
@Override
public void deleteObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
delegate.deleteObject(bucketName, key);
}
@Override
public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteObject(deleteObjectRequest);
}
@Override
public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.deleteObjects(deleteObjectsRequest);
}
@Override
public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
delegate.deleteVersion(bucketName, key, versionId);
}
@Override
public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteVersion(deleteVersionRequest);
}
@Override
public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketLoggingConfiguration(bucketName);
}
@Override
public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketLoggingConfiguration(setBucketLoggingConfigurationRequest);
}
@Override
public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketVersioningConfiguration(bucketName);
}
@Override
public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest setBucketVersioningConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketVersioningConfiguration(setBucketVersioningConfigurationRequest);
}
@Override
public BucketLifecycleConfiguration getBucketLifecycleConfiguration(String bucketName) {
return delegate.getBucketLifecycleConfiguration(bucketName);
}
@Override
public void setBucketLifecycleConfiguration(String bucketName, BucketLifecycleConfiguration bucketLifecycleConfiguration) {
delegate.setBucketLifecycleConfiguration(bucketName, bucketLifecycleConfiguration);
}
@Override
public void setBucketLifecycleConfiguration(SetBucketLifecycleConfigurationRequest setBucketLifecycleConfigurationRequest) {
delegate.setBucketLifecycleConfiguration(setBucketLifecycleConfigurationRequest);
}
@Override
public void deleteBucketLifecycleConfiguration(String bucketName) {
delegate.deleteBucketLifecycleConfiguration(bucketName);
}
@Override
public void deleteBucketLifecycleConfiguration(DeleteBucketLifecycleConfigurationRequest deleteBucketLifecycleConfigurationRequest) {
delegate.deleteBucketLifecycleConfiguration(deleteBucketLifecycleConfigurationRequest);
}
@Override
public BucketCrossOriginConfiguration getBucketCrossOriginConfiguration(String bucketName) {
return delegate.getBucketCrossOriginConfiguration(bucketName);
}
@Override
public void setBucketCrossOriginConfiguration(String bucketName, BucketCrossOriginConfiguration bucketCrossOriginConfiguration) {
delegate.setBucketCrossOriginConfiguration(bucketName, bucketCrossOriginConfiguration);
}
@Override
public void setBucketCrossOriginConfiguration(SetBucketCrossOriginConfigurationRequest setBucketCrossOriginConfigurationRequest) {
delegate.setBucketCrossOriginConfiguration(setBucketCrossOriginConfigurationRequest);
}
@Override
public void deleteBucketCrossOriginConfiguration(String bucketName) {
delegate.deleteBucketCrossOriginConfiguration(bucketName);
}
@Override
public void deleteBucketCrossOriginConfiguration(DeleteBucketCrossOriginConfigurationRequest deleteBucketCrossOriginConfigurationRequest) {
delegate.deleteBucketCrossOriginConfiguration(deleteBucketCrossOriginConfigurationRequest);
}
@Override
public BucketTaggingConfiguration getBucketTaggingConfiguration(String bucketName) {
return delegate.getBucketTaggingConfiguration(bucketName);
}
@Override
public void setBucketTaggingConfiguration(String bucketName, BucketTaggingConfiguration bucketTaggingConfiguration) {
delegate.setBucketTaggingConfiguration(bucketName, bucketTaggingConfiguration);
}
@Override
public void setBucketTaggingConfiguration(SetBucketTaggingConfigurationRequest setBucketTaggingConfigurationRequest) {
delegate.setBucketTaggingConfiguration(setBucketTaggingConfigurationRequest);
}
@Override
public void deleteBucketTaggingConfiguration(String bucketName) {
delegate.deleteBucketTaggingConfiguration(bucketName);
}
@Override
public void deleteBucketTaggingConfiguration(DeleteBucketTaggingConfigurationRequest deleteBucketTaggingConfigurationRequest) {
delegate.deleteBucketTaggingConfiguration(deleteBucketTaggingConfigurationRequest);
}
@Override
public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketNotificationConfiguration(bucketName);
}
@Override
public void setBucketNotificationConfiguration(SetBucketNotificationConfigurationRequest setBucketNotificationConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketNotificationConfiguration(setBucketNotificationConfigurationRequest);
}
@Override
public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration bucketNotificationConfiguration) throws AmazonClientException, AmazonServiceException {
delegate.setBucketNotificationConfiguration(bucketName, bucketNotificationConfiguration);
}
@Override
public BucketWebsiteConfiguration getBucketWebsiteConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketWebsiteConfiguration(bucketName);
}
@Override
public BucketWebsiteConfiguration getBucketWebsiteConfiguration(GetBucketWebsiteConfigurationRequest getBucketWebsiteConfigurationRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketWebsiteConfiguration(getBucketWebsiteConfigurationRequest);
}
@Override
public void setBucketWebsiteConfiguration(String bucketName, BucketWebsiteConfiguration configuration) throws AmazonClientException, AmazonServiceException {
delegate.setBucketWebsiteConfiguration(bucketName, configuration);
}
@Override
public void setBucketWebsiteConfiguration(SetBucketWebsiteConfigurationRequest setBucketWebsiteConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketWebsiteConfiguration(setBucketWebsiteConfigurationRequest);
}
@Override
public void deleteBucketWebsiteConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketWebsiteConfiguration(bucketName);
}
@Override
public void deleteBucketWebsiteConfiguration(DeleteBucketWebsiteConfigurationRequest deleteBucketWebsiteConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketWebsiteConfiguration(deleteBucketWebsiteConfigurationRequest);
}
@Override
public BucketPolicy getBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketPolicy(bucketName);
}
@Override
public BucketPolicy getBucketPolicy(GetBucketPolicyRequest getBucketPolicyRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketPolicy(getBucketPolicyRequest);
}
@Override
public void setBucketPolicy(String bucketName, String policyText) throws AmazonClientException, AmazonServiceException {
delegate.setBucketPolicy(bucketName, policyText);
}
@Override
public void setBucketPolicy(SetBucketPolicyRequest setBucketPolicyRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketPolicy(setBucketPolicyRequest);
}
@Override
public void deleteBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketPolicy(bucketName);
}
@Override
public void deleteBucketPolicy(DeleteBucketPolicyRequest deleteBucketPolicyRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketPolicy(deleteBucketPolicyRequest);
}
@Override
public URL generatePresignedUrl(String bucketName, String key, Date expiration) throws AmazonClientException {
return delegate.generatePresignedUrl(bucketName, key, expiration);
}
@Override
public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method) throws AmazonClientException {
return delegate.generatePresignedUrl(bucketName, key, expiration, method);
}
@Override
public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest) throws AmazonClientException {
return delegate.generatePresignedUrl(generatePresignedUrlRequest);
}
@Override
public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.initiateMultipartUpload(request);
}
@Override
public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.uploadPart(request);
}
@Override
public PartListing listParts(ListPartsRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.listParts(request);
}
@Override
public void abortMultipartUpload(AbortMultipartUploadRequest request) throws AmazonClientException, AmazonServiceException {
delegate.abortMultipartUpload(request);
}
@Override
public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.completeMultipartUpload(request);
}
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.listMultipartUploads(request);
}
@Override
public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
return delegate.getCachedResponseMetadata(request);
}
@Override
public void restoreObject(RestoreObjectRequest copyGlacierObjectRequest) throws AmazonServiceException {
delegate.restoreObject(copyGlacierObjectRequest);
}
@Override
public void restoreObject(String bucketName, String key, int expirationInDays) throws AmazonServiceException {
delegate.restoreObject(bucketName, key, expirationInDays);
}
@Override
public void enableRequesterPays(String bucketName) throws AmazonServiceException, AmazonClientException {
delegate.enableRequesterPays(bucketName);
}
@Override
public void disableRequesterPays(String bucketName) throws AmazonServiceException, AmazonClientException {
delegate.disableRequesterPays(bucketName);
}
@Override
public boolean isRequesterPaysEnabled(String bucketName) throws AmazonServiceException, AmazonClientException {
return delegate.isRequesterPaysEnabled(bucketName);
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
/**
*
*/
public class TestAmazonS3 extends AmazonS3Wrapper {
protected final ESLogger logger = Loggers.getLogger(getClass());
private double writeFailureRate = 0.0;
private double readFailureRate = 0.0;
private String randomPrefix;
ConcurrentMap<String, AtomicLong> accessCounts = new ConcurrentHashMap<String, AtomicLong>();
private long incrementAndGet(String path) {
AtomicLong value = accessCounts.get(path);
if (value == null) {
value = accessCounts.putIfAbsent(path, new AtomicLong(1));
}
if (value != null) {
return value.incrementAndGet();
}
return 1;
}
public TestAmazonS3(AmazonS3 delegate, Settings settings) {
super(delegate);
randomPrefix = settings.get("cloud.aws.test.random");
writeFailureRate = settings.getAsDouble("cloud.aws.test.write_failures", 0.0);
readFailureRate = settings.getAsDouble("cloud.aws.test.read_failures", 0.0);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) throws AmazonClientException, AmazonServiceException {
if (shouldFail(bucketName, key, writeFailureRate)) {
long length = metadata.getContentLength();
long partToRead = (long) (length * randomDouble());
byte[] buffer = new byte[1024];
for (long cur = 0; cur < partToRead; cur += buffer.length) {
try {
input.read(buffer, 0, (int) (partToRead - cur > buffer.length ? buffer.length : partToRead - cur));
} catch (IOException ex) {
throw new ElasticsearchException("cannot read input stream", ex);
}
}
logger.info("--> random write failure on putObject method: throwing an exception for [bucket={}, key={}]", bucketName, key);
AmazonS3Exception ex = new AmazonS3Exception("Random S3 exception");
ex.setStatusCode(400);
ex.setErrorCode("RequestTimeout");
throw ex;
} else {
return super.putObject(bucketName, key, input, metadata);
}
}
@Override
public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException, AmazonServiceException {
if (shouldFail(request.getBucketName(), request.getKey(), writeFailureRate)) {
long length = request.getPartSize();
long partToRead = (long) (length * randomDouble());
byte[] buffer = new byte[1024];
for (long cur = 0; cur < partToRead; cur += buffer.length) {
try (InputStream input = request.getInputStream()){
input.read(buffer, 0, (int) (partToRead - cur > buffer.length ? buffer.length : partToRead - cur));
} catch (IOException ex) {
throw new ElasticsearchException("cannot read input stream", ex);
}
}
logger.info("--> random write failure on uploadPart method: throwing an exception for [bucket={}, key={}]", request.getBucketName(), request.getKey());
AmazonS3Exception ex = new AmazonS3Exception("Random S3 write exception");
ex.setStatusCode(400);
ex.setErrorCode("RequestTimeout");
throw ex;
} else {
return super.uploadPart(request);
}
}
@Override
public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
if (shouldFail(bucketName, key, readFailureRate)) {
logger.info("--> random read failure on getObject method: throwing an exception for [bucket={}, key={}]", bucketName, key);
AmazonS3Exception ex = new AmazonS3Exception("Random S3 read exception");
ex.setStatusCode(404);
throw ex;
} else {
return super.getObject(bucketName, key);
}
}
private boolean shouldFail(String bucketName, String key, double probability) {
if (probability > 0.0) {
String path = randomPrefix + "-" + bucketName + "+" + key;
path += "/" + incrementAndGet(path);
return Math.abs(hashCode(path)) < Integer.MAX_VALUE * probability;
} else {
return false;
}
}
private int hashCode(String path) {
try {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] bytes = digest.digest(path.getBytes("UTF-8"));
int i = 0;
return ((bytes[i++] & 0xFF) << 24) | ((bytes[i++] & 0xFF) << 16)
| ((bytes[i++] & 0xFF) << 8) | (bytes[i++] & 0xFF);
} catch (UnsupportedEncodingException ex) {
throw new ElasticsearchException("cannot calculate hashcode", ex);
} catch (NoSuchAlgorithmException ex) {
throw new ElasticsearchException("cannot calculate hashcode", ex);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.IdentityHashMap;
/**
*
*/
public class TestAwsS3Service extends InternalAwsS3Service {
IdentityHashMap<AmazonS3, TestAmazonS3> clients = new IdentityHashMap<AmazonS3, TestAmazonS3>();
@Inject
public TestAwsS3Service(Settings settings) {
super(settings);
}
@Override
public synchronized AmazonS3 client() {
return cachedWrapper(super.client());
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key));
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries));
}
private AmazonS3 cachedWrapper(AmazonS3 client) {
TestAmazonS3 wrapper = clients.get(client);
if (wrapper == null) {
wrapper = new TestAmazonS3(client, settings);
clients.put(client, wrapper);
}
return wrapper;
}
@Override
protected synchronized void doClose() throws ElasticsearchException {
super.doClose();
clients.clear();
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.PartETag;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.elasticsearch.common.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
public class MockDefaultS3OutputStream extends DefaultS3OutputStream {
private ByteArrayOutputStream out = new ByteArrayOutputStream();
private boolean initialized = false;
private boolean completed = false;
private boolean aborted = false;
private int numberOfUploadRequests = 0;
public MockDefaultS3OutputStream(int bufferSizeInBytes) {
super(null, "test-bucket", "test-blobname", bufferSizeInBytes, 3, false);
}
@Override
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, boolean serverSideEncryption) throws AmazonS3Exception {
try {
long copied = Streams.copy(is, out);
if (copied != length) {
throw new AmazonS3Exception("Not all the bytes were copied");
}
numberOfUploadRequests++;
} catch (IOException e) {
throw new AmazonS3Exception(e.getMessage());
}
}
@Override
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
initialized = true;
return RandomizedTest.randomAsciiOfLength(50);
}
@Override
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, int length, boolean lastPart) throws AmazonS3Exception {
try {
long copied = Streams.copy(is, out);
if (copied != length) {
throw new AmazonS3Exception("Not all the bytes were copied");
}
return new PartETag(numberOfUploadRequests++, RandomizedTest.randomAsciiOfLength(50));
} catch (IOException e) {
throw new AmazonS3Exception(e.getMessage());
}
}
@Override
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts) throws AmazonS3Exception {
completed = true;
}
@Override
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) throws AmazonS3Exception {
aborted = true;
}
public int getNumberOfUploadRequests() {
return numberOfUploadRequests;
}
public boolean isMultipart() {
return (numberOfUploadRequests > 1) && initialized && completed && !aborted;
}
public byte[] toByteArray() {
return out.toByteArray();
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.common.io.Streams.copy;
import static org.hamcrest.Matchers.equalTo;
/**
* Unit test for {@link S3OutputStream}.
*/
public class S3OutputStreamTest extends ElasticsearchTestCase {
private static final int BUFFER_SIZE = S3BlobStore.MIN_BUFFER_SIZE.bytesAsInt();
@Test
public void testWriteLessDataThanBufferSize() throws IOException {
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
byte[] content = randomUnicodeOfLengthBetween(1, 512).getBytes("UTF-8");
copy(content, out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) content.length));
assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE));
assertThat(out.getFlushCount(), equalTo(1));
assertThat(out.getNumberOfUploadRequests(), equalTo(1));
assertFalse(out.isMultipart());
}
@Test
public void testWriteSameDataThanBufferSize() throws IOException {
int size = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE);
MockDefaultS3OutputStream out = newS3OutputStream(size);
ByteArrayOutputStream content = new ByteArrayOutputStream(size);
for (int i = 0; i < size; i++) {
content.write(randomByte());
}
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) size));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(size));
assertThat(out.getFlushCount(), equalTo(1));
assertThat(out.getNumberOfUploadRequests(), equalTo(1));
assertFalse(out.isMultipart());
}
@Test @Slow
public void testWriteExactlyNTimesMoreDataThanBufferSize() throws IOException {
int n = randomIntBetween(2, 3);
int length = n * BUFFER_SIZE;
ByteArrayOutputStream content = new ByteArrayOutputStream(length);
for (int i = 0; i < length; i++) {
content.write(randomByte());
}
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) length));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE));
assertThat(out.getFlushCount(), equalTo(n));
assertThat(out.getNumberOfUploadRequests(), equalTo(n));
assertTrue(out.isMultipart());
}
@Test
public void testWriteRandomNumberOfBytes() throws IOException {
Integer randomBufferSize = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE);
MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize);
Integer randomLength = randomIntBetween(1, 2 * BUFFER_SIZE);
ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength);
for (int i = 0; i < randomLength; i++) {
content.write(randomByte());
}
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) randomLength));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
assertThat(out.getBufferSize(), equalTo(randomBufferSize));
int times = (int) Math.ceil(randomLength.doubleValue() / randomBufferSize.doubleValue());
assertThat(out.getFlushCount(), equalTo(times));
if (times > 1) {
assertTrue(out.isMultipart());
} else {
assertFalse(out.isMultipart());
}
}
@Test(expected = IllegalArgumentException.class)
public void testWrongBufferSize() throws IOException {
Integer randomBufferSize = randomIntBetween(1, 4 * 1024 * 1024);
MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize);
fail("Buffer size can't be smaller than 5mb");
}
private MockDefaultS3OutputStream newS3OutputStream(int bufferSizeInBytes) {
return new MockDefaultS3OutputStream(bufferSizeInBytes);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import org.elasticsearch.cloud.aws.AbstractAwsTest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
/**
* Just an empty Node Start test to check eveything if fine when
* starting.
* This test requires AWS to run.
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class Ec2DiscoveryITest extends AbstractAwsTest {
@Test
public void testStart() {
Settings nodeSettings = settingsBuilder()
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
.put("cloud.enabled", true)
.put("discovery.type", "ec2")
.build();
internalCluster().startNode(nodeSettings);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cloud.aws.AbstractAwsTest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.CoreMatchers.is;
/**
* Just an empty Node Start test to check eveything if fine when
* starting.
* This test requires AWS to run.
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class Ec2DiscoveryUpdateSettingsITest extends AbstractAwsTest {
@Test
public void testMinimumMasterNodesStart() {
Settings nodeSettings = settingsBuilder()
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
.put("cloud.enabled", true)
.put("discovery.type", "ec2")
.build();
internalCluster().startNode(nodeSettings);
// We try to update minimum_master_nodes now
ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(settingsBuilder().put("discovery.zen.minimum_master_nodes", 1))
.setTransientSettings(settingsBuilder().put("discovery.zen.minimum_master_nodes", 1))
.get();
Integer min = response.getPersistentSettings().getAsInt("discovery.zen.minimum_master_nodes", null);
assertThat(min, is(1));
}
}

View File

@ -0,0 +1,502 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.cloud.aws.AbstractAwsTest;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, transportClientRatio = 0.0)
abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
@Override
public Settings indexSettings() {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
// checksum file to be written twice during restore operation
return Settings.builder().put(super.indexSettings())
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false)
.put("cloud.enabled", true)
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
.build();
}
private String basePath;
@Before
public final void wipeBefore() {
wipeRepositories();
basePath = "repo-" + randomInt();
cleanRepositoryFiles(basePath);
}
@After
public final void wipeAfter() {
wipeRepositories();
cleanRepositoryFiles(basePath);
}
@Test @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testSimpleWorkflow() {
Client client = client();
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("chunk_size", randomIntBetween(1000, 10000))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
cluster().wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
@Test @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testEncryption() {
Client client = client();
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("chunk_size", randomIntBetween(1000, 10000))
.put("server_side_encryption", true)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
null,
null,
bucket.get("region", settings.get("repositories.s3.region")),
bucket.get("access_key", settings.get("cloud.aws.access_key")),
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
List<S3ObjectSummary> summaries = s3Client.listObjects(bucketName, basePath).getObjectSummaries();
for (S3ObjectSummary summary : summaries) {
assertThat(s3Client.getObjectMetadata(bucketName, summary.getKey()).getSSEAlgorithm(), equalTo("AES256"));
}
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
cluster().wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
/**
* This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryWithCustomCredentials()} pointless.
*/
@Test(expected = RepositoryVerificationException.class)
public void assertRepositoryWithCustomCredentialsIsNotAccessibleByDefaultCredentials() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
).get();
fail("repository verification should have raise an exception!");
}
@Test
public void testRepositoryWithCustomCredentials() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("region", bucketSettings.get("region"))
.put("access_key", bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key"))
.put("bucket", bucketSettings.get("bucket"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
@Test @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testRepositoryWithCustomEndpointProtocol() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.external-bucket.");
logger.info("--> creating s3 repostoriy with endpoint [{}], bucket[{}] and path [{}]", bucketSettings.get("endpoint"), bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("bucket", bucketSettings.get("bucket"))
.put("endpoint", bucketSettings.get("endpoint"))
.put("access_key", bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key"))
.put("base_path", basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
/**
* This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryInRemoteRegion()} pointless.
*/
@Test(expected = RepositoryVerificationException.class)
public void assertRepositoryInRemoteRegionIsRemote() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.remote-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
// Below setting intentionally omitted to assert bucket is not available in default region.
// .put("region", privateBucketSettings.get("region"))
).get();
fail("repository verification should have raise an exception!");
}
@Test @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testRepositoryInRemoteRegion() {
Client client = client();
Settings settings = internalCluster().getInstance(Settings.class);
Settings bucketSettings = settings.getByPrefix("repositories.s3.remote-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
.put("region", bucketSettings.get("region"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
}
/**
* Test case for issue #86: https://github.com/elasticsearch/elasticsearch-cloud-aws/issues/86
*/
@Test
public void testNonExistingRepo_86() {
Client client = client();
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> restore non existing snapshot");
try {
client.admin().cluster().prepareRestoreSnapshot("test-repo", "no-existing-snapshot").setWaitForCompletion(true).execute().actionGet();
fail("Shouldn't be here");
} catch (SnapshotMissingException ex) {
// Expected
}
}
/**
* For issue #86: https://github.com/elasticsearch/elasticsearch-cloud-aws/issues/86
*/
@Test
public void testGetDeleteNonExistingSnapshot_86() {
ClusterAdminClient client = client().admin().cluster();
logger.info("--> creating s3 repository without any path");
PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
try {
client.prepareGetSnapshots("test-repo").addSnapshots("no-existing-snapshot").get();
fail("Shouldn't be here");
} catch (SnapshotMissingException ex) {
// Expected
}
try {
client.prepareDeleteSnapshot("test-repo", "no-existing-snapshot").get();
fail("Shouldn't be here");
} catch (SnapshotMissingException ex) {
// Expected
}
}
private void assertRepositoryIsOperational(Client client, String repository) {
createIndex("test-idx-1");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repository, "test-snap").setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots(repository).setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repository, "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
}
/**
* Deletes repositories, supports wildcard notation.
*/
public static void wipeRepositories(String... repositories) {
// if nothing is provided, delete all
if (repositories.length == 0) {
repositories = new String[]{"*"};
}
for (String repository : repositories) {
try {
client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet();
} catch (RepositoryMissingException ex) {
// ignore
}
}
}
/**
* Deletes content of the repository files in the bucket
*/
public void cleanRepositoryFiles(String basePath) {
Settings settings = internalCluster().getInstance(Settings.class);
Settings[] buckets = {
settings.getByPrefix("repositories.s3."),
settings.getByPrefix("repositories.s3.private-bucket."),
settings.getByPrefix("repositories.s3.remote-bucket."),
settings.getByPrefix("repositories.s3.external-bucket.")
};
for (Settings bucket : buckets) {
String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint"));
String protocol = bucket.get("protocol", settings.get("repositories.s3.protocol"));
String region = bucket.get("region", settings.get("repositories.s3.region"));
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key"));
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key"));
String bucketName = bucket.get("bucket");
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey);
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
//We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
ArrayList<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<DeleteObjectsRequest.KeyVersion>();
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucketName, basePath);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
//Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
}
} catch (Throwable ex) {
logger.warn("Failed to delete S3 repository [{}] in [{}]", ex, bucketName, region);
}
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.common.settings.Settings;
import org.junit.Before;
/**
* This will only run if you define in your `elasticsearch.yml` file a s3 specific proxy
* cloud.aws.s3.proxy_host: mys3proxy.company.com
* cloud.aws.s3.proxy_port: 8080
*/
public class S3ProxiedSnapshotRestoreOverHttpsTest extends AbstractS3SnapshotRestoreTest {
private boolean proxySet = false;
@Override
public Settings nodeSettings(int nodeOrdinal) {
Settings settings = super.nodeSettings(nodeOrdinal);
String proxyHost = settings.get("cloud.aws.s3.proxy_host");
proxySet = proxyHost != null;
return settings;
}
@Before
public void checkProxySettings() {
assumeTrue("we are expecting proxy settings in elasticsearch.yml file", proxySet);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.common.settings.Settings;
/**
*/
public class S3SnapshotRestoreOverHttpTest extends AbstractS3SnapshotRestoreTest {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("cloud.aws.s3.protocol", "http");
return settings.build();
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.common.settings.Settings;
/**
*/
public class S3SnapshotRestoreOverHttpsTest extends AbstractS3SnapshotRestoreTest {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("cloud.aws.s3.protocol", "https");
return settings.build();
}
}