Update and rebase the init implementation.

Also removes the MD5 checks from our side, AWS S3 SDK java is doing the
check.
This commit is contained in:
Xu Zhang 2016-02-25 17:13:31 -08:00
parent ea78fd6560
commit 7499e3aa4a
9 changed files with 219 additions and 223 deletions

View File

@ -19,11 +19,6 @@
package org.elasticsearch.http; package org.elasticsearch.http;
<<<<<<< HEAD:core/src/main/java/org/elasticsearch/http/HttpServerTransport.java
=======
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.EncryptionMaterials;
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.BoundTransportAddress;
@ -34,15 +29,9 @@ public interface HttpServerTransport extends LifecycleComponent<HttpServerTransp
BoundTransportAddress boundAddress(); BoundTransportAddress boundAddress();
<<<<<<< HEAD:core/src/main/java/org/elasticsearch/http/HttpServerTransport.java
HttpInfo info(); HttpInfo info();
HttpStats stats(); HttpStats stats();
void httpServerAdapter(HttpServerAdapter httpServerAdapter); void httpServerAdapter(HttpServerAdapter httpServerAdapter);
=======
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries, EncryptionMaterials clientSideEncryptionMaterials);
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java
} }

View File

@ -236,6 +236,16 @@ The following settings are supported:
currently supported by the plugin. For more information about the currently supported by the plugin. For more information about the
different classes, see http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html[AWS Storage Classes Guide] different classes, see http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html[AWS Storage Classes Guide]
`client_symmetric_key`::
Sets the keys to use to encrypt your snapshots. You can specify either a symmetric key or a public/private key pair.
No encryption by default. This sets a Base64-encoded AES symmetric-key (128, 192 or 256 bits)
`client_public_key`::
Sets the a base64-encoded RSA public key
`client_private_key`::
Sets the a base64-encoded RSA private key
The S3 repositories use the same credentials as the rest of the AWS services The S3 repositories use the same credentials as the rest of the AWS services
provided by this plugin (`discovery`). See <<repository-s3-usage>> for details. provided by this plugin (`discovery`). See <<repository-s3-usage>> for details.

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol; import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.EncryptionMaterials;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
@ -154,4 +155,5 @@ public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
} }
AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries); AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries);
AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries, EncryptionMaterials clientSideEncryptionMaterials);
} }

View File

@ -31,15 +31,11 @@ import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.internal.StaticCredentialsProvider; import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
<<<<<<< HEAD:plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
=======
import com.amazonaws.services.s3.AmazonS3EncryptionClient; import com.amazonaws.services.s3.AmazonS3EncryptionClient;
import com.amazonaws.services.s3.model.CryptoConfiguration; import com.amazonaws.services.s3.model.CryptoConfiguration;
import com.amazonaws.services.s3.model.EncryptionMaterials; import com.amazonaws.services.s3.model.EncryptionMaterials;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
@ -56,15 +52,9 @@ import java.util.Map;
public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Service> implements AwsS3Service { public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Service> implements AwsS3Service {
/** /**
<<<<<<< HEAD:plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
* (acceskey, endpoint) -&gt; client * (acceskey, endpoint) -&gt; client
*/ */
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<>(); private Map<Tuple<String, Tuple<String, EncryptionMaterials>>, AmazonS3Client> clients = new HashMap<>();
=======
* (acceskey, (endpoint, clientSideEncryptionKey)) -> client
*/
private Map<Tuple<String, Tuple<String, EncryptionMaterials>>, AmazonS3Client> clients = new HashMap<Tuple<String,Tuple<String, EncryptionMaterials>>, AmazonS3Client>();
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
@Inject @Inject
public InternalAwsS3Service(Settings settings) { public InternalAwsS3Service(Settings settings) {
@ -72,8 +62,12 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
} }
@Override @Override
<<<<<<< HEAD:plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) { public synchronized AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries) {
return client(endpoint, protocol, region, account, key, maxRetries, null);
}
@Override
public AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key, Integer maxRetries, EncryptionMaterials clientSideEncryptionMaterials) {
if (Strings.isNullOrEmpty(endpoint)) { if (Strings.isNullOrEmpty(endpoint)) {
// We need to set the endpoint based on the region // We need to set the endpoint based on the region
if (region != null) { if (region != null) {
@ -83,50 +77,16 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
// No region has been set so we will use the default endpoint // No region has been set so we will use the default endpoint
endpoint = getDefaultEndpoint(); endpoint = getDefaultEndpoint();
} }
=======
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = settings.get("cloud.aws.access_key", settings.get("cloud.account"));
String key = settings.get("cloud.aws.secret_key", settings.get("cloud.key"));
return getClient(endpoint, null, account, key, null, null);
}
@Override
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return client(endpoint, protocol, region, account, key, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
return client(endpoint, protocol, region, account, key, maxRetries, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries, EncryptionMaterials clientSideEncryptionMaterials) {
if (region != null && endpoint == null) {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
} else if (endpoint == null) {
endpoint = getDefaultEndpoint();
}
if (account == null || key == null) {
account = settings.get("cloud.aws.access_key", settings.get("cloud.account"));
key = settings.get("cloud.aws.secret_key", settings.get("cloud.key"));
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
} }
return getClient(endpoint, protocol, account, key, maxRetries, clientSideEncryptionMaterials); return getClient(endpoint, protocol, account, key, maxRetries, clientSideEncryptionMaterials);
} }
<<<<<<< HEAD:plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java private synchronized AmazonS3 getClient(String endpoint, Protocol protocol, String account, String key, Integer maxRetries,
private synchronized AmazonS3 getClient(String endpoint, Protocol protocol, String account, String key, Integer maxRetries) { EncryptionMaterials clientSideEncryptionMaterials) {
Tuple<String, String> clientDescriptor = new Tuple<>(endpoint, account);
=======
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries, EncryptionMaterials clientSideEncryptionMaterials) { Tuple<String, EncryptionMaterials> tempTuple = new Tuple<>(account, clientSideEncryptionMaterials);
Tuple<String, Tuple<String, EncryptionMaterials>> clientDescriptor = new Tuple<String, Tuple<String, EncryptionMaterials>>(endpoint, new Tuple(account, clientSideEncryptionMaterials)); Tuple<String, Tuple<String, EncryptionMaterials>> clientDescriptor = new Tuple<>(endpoint, tempTuple);
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
AmazonS3Client client = clients.get(clientDescriptor); AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) { if (client != null) {
return client; return client;
@ -177,7 +137,7 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
); );
} }
if(clientSideEncryptionMaterials != null) { if (clientSideEncryptionMaterials != null) {
EncryptionMaterialsProvider encryptionMaterialsProvider = new StaticEncryptionMaterialsProvider(clientSideEncryptionMaterials); EncryptionMaterialsProvider encryptionMaterialsProvider = new StaticEncryptionMaterialsProvider(clientSideEncryptionMaterials);
CryptoConfiguration cryptoConfiguration = new CryptoConfiguration(); CryptoConfiguration cryptoConfiguration = new CryptoConfiguration();
client = new AmazonS3EncryptionClient( client = new AmazonS3EncryptionClient(

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cloud.aws.blobstore; package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
@ -131,31 +133,10 @@ public class DefaultS3OutputStream extends S3OutputStream {
} }
md.setContentLength(length); md.setContentLength(length);
InputStream inputStream = is; PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, is, md)
// We try to compute a MD5 while reading it
MessageDigest messageDigest;
try {
messageDigest = MessageDigest.getInstance("MD5");
inputStream = new DigestInputStream(is, messageDigest);
} catch (NoSuchAlgorithmException impossible) {
// Every implementation of the Java platform is required to support MD5 (see MessageDigest)
throw new RuntimeException(impossible);
}
PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, inputStream, md)
.withStorageClass(blobStore.getStorageClass()) .withStorageClass(blobStore.getStorageClass())
.withCannedAcl(blobStore.getCannedACL()); .withCannedAcl(blobStore.getCannedACL());
PutObjectResult putObjectResult = blobStore.client().putObject(putRequest); blobStore.client().putObject(putRequest);
String localMd5 = Base64.encodeAsString(messageDigest.digest());
String remoteMd5 = putObjectResult.getContentMd5();
if (!localMd5.equals(remoteMd5)) {
logger.debug("MD5 local [{}], remote [{}] are not equal...", localMd5, remoteMd5);
throw new AmazonS3Exception("MD5 local [" + localMd5 +
"], remote [" + remoteMd5 +
"] are not equal...");
}
} }
private void initializeMultipart() { private void initializeMultipart() {

View File

@ -21,13 +21,10 @@ package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.AmazonS3Exception;
<<<<<<< HEAD:plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java
import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CreateBucketRequest; import com.amazonaws.services.s3.model.CreateBucketRequest;
=======
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
>>>>>>> 98d508f... Add client-side encryption:src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
@ -79,7 +76,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
throw new BlobStoreException("Detected client-side encryption " + throw new BlobStoreException("Detected client-side encryption " +
"and a buffer_size for the S3 storage not a multiple of the cipher block size (16)"); "and a buffer_size for the S3 storage not a multiple of the cipher block size (16)");
} }
this.cannedACL = initCannedACL(cannedACL); this.cannedACL = initCannedACL(cannedACL);
this.numberOfRetries = maxRetries; this.numberOfRetries = maxRetries;
this.storageClass = initStorageClass(storageClass); this.storageClass = initStorageClass(storageClass);

View File

@ -128,6 +128,9 @@ public class S3RepositoryPlugin extends Plugin {
settingsModule.registerSetting(S3Repository.Repositories.STORAGE_CLASS_SETTING); settingsModule.registerSetting(S3Repository.Repositories.STORAGE_CLASS_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.CANNED_ACL_SETTING); settingsModule.registerSetting(S3Repository.Repositories.CANNED_ACL_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.BASE_PATH_SETTING); settingsModule.registerSetting(S3Repository.Repositories.BASE_PATH_SETTING);
settingsModule.registerSetting(S3Repository.Repositories.CLIENT_PRIVATE_KEY);
settingsModule.registerSetting(S3Repository.Repositories.CLIENT_PUBLIC_KEY);
settingsModule.registerSetting(S3Repository.Repositories.CLIENT_SYMMETRIC_KEY);
// Register S3 single repository settings // Register S3 single repository settings
settingsModule.registerSetting(S3Repository.Repository.KEY_SETTING); settingsModule.registerSetting(S3Repository.Repository.KEY_SETTING);
@ -144,6 +147,10 @@ public class S3RepositoryPlugin extends Plugin {
settingsModule.registerSetting(S3Repository.Repository.STORAGE_CLASS_SETTING); settingsModule.registerSetting(S3Repository.Repository.STORAGE_CLASS_SETTING);
settingsModule.registerSetting(S3Repository.Repository.CANNED_ACL_SETTING); settingsModule.registerSetting(S3Repository.Repository.CANNED_ACL_SETTING);
settingsModule.registerSetting(S3Repository.Repository.BASE_PATH_SETTING); settingsModule.registerSetting(S3Repository.Repository.BASE_PATH_SETTING);
settingsModule.registerSetting(S3Repository.Repository.CLIENT_PRIVATE_KEY);
settingsModule.registerSetting(S3Repository.Repository.CLIENT_PUBLIC_KEY);
settingsModule.registerSetting(S3Repository.Repository.CLIENT_SYMMETRIC_KEY);
} }
/** /**

View File

@ -20,6 +20,8 @@
package org.elasticsearch.repositories.s3; package org.elasticsearch.repositories.s3;
import com.amazonaws.Protocol; import com.amazonaws.Protocol;
import com.amazonaws.services.s3.model.EncryptionMaterials;
import com.amazonaws.util.Base64;
import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_S3; import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_S3;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore; import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
@ -37,7 +39,15 @@ import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException; import java.io.IOException;
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Locale; import java.util.Locale;
import java.util.function.Function; import java.util.function.Function;
@ -140,6 +150,21 @@ public class S3Repository extends BlobStoreRepository {
* repositories.s3.base_path: Specifies the path within bucket to repository data. Defaults to root directory. * repositories.s3.base_path: Specifies the path within bucket to repository data. Defaults to root directory.
*/ */
Setting<String> BASE_PATH_SETTING = Setting.simpleString("repositories.s3.base_path", Property.NodeScope); Setting<String> BASE_PATH_SETTING = Setting.simpleString("repositories.s3.base_path", Property.NodeScope);
/**
* repositories.s3.client_symmetric_key: Specifies the Base64-encoded AES symmetric-key (128, 192 or 256 bits)
*/
Setting<String> CLIENT_SYMMETRIC_KEY = Setting.simpleString("repositories.s3.client_symmetric_key", Property.NodeScope);
/**
* repositories.s3.client_public_key: Specifies the Base64-encoded RSA public key
*/
Setting<String> CLIENT_PUBLIC_KEY = Setting.simpleString("repositories.s3.client_public_key", Property.NodeScope);
/**
* repositories.s3.client_private_key: Specifies the Base64-encoded RSA private key
*/
Setting<String> CLIENT_PRIVATE_KEY = Setting.simpleString("repositories.s3.client_private_key", Property.NodeScope);
} }
/** /**
@ -222,7 +247,26 @@ public class S3Repository extends BlobStoreRepository {
* base_path * base_path
* @see Repositories#BASE_PATH_SETTING * @see Repositories#BASE_PATH_SETTING
*/ */
Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path", Property.NodeScope); Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path", Property.NodeScope);
/**
* base_path
* @see Repositories#CLIENT_SYMMETRIC_KEY
*/
Setting<String> CLIENT_SYMMETRIC_KEY = Setting.simpleString("client_symmetric_key", Property.NodeScope);
/**
* base_path
* @see Repositories#CLIENT_PUBLIC_KEY
*/
Setting<String> CLIENT_PUBLIC_KEY = Setting.simpleString("client_public_key", Property.NodeScope);
/**
* base_path
* @see Repositories#CLIENT_PRIVATE_KEY
*/
Setting<String> CLIENT_PRIVATE_KEY = Setting.simpleString("client_private_key", Property.NodeScope);
} }
private final S3BlobStore blobStore; private final S3BlobStore blobStore;
@ -275,14 +319,30 @@ public class S3Repository extends BlobStoreRepository {
String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING); String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING); String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]", logger.debug("using bqucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}], cannedACL [{}], storageClass [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass); bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
String key = getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING); String key = getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING);
String secret = getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING); String secret = getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, key, secret, maxRetries), // parse and validate the client side encryption setting
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass); String symmetricKeyBase64 = getValue(repositorySettings, Repository.CLIENT_SYMMETRIC_KEY, Repositories.CLIENT_SYMMETRIC_KEY);
String publicKeyBase64 =getValue(repositorySettings, Repository.CLIENT_PUBLIC_KEY, Repositories.CLIENT_PUBLIC_KEY);
String privateKeyBase64 = getValue(repositorySettings, Repository.CLIENT_PRIVATE_KEY, Repositories.CLIENT_PRIVATE_KEY);
EncryptionMaterials clientSideEncryptionMaterials = initClientSideEncryption(symmetricKeyBase64, publicKeyBase64, privateKeyBase64, name);
blobStore = new S3BlobStore(
settings,
s3Service.client(endpoint, protocol, region, key, secret, maxRetries, clientSideEncryptionMaterials),
bucket,
region,
serverSideEncryption,
bufferSize,
maxRetries,
cannedACL,
storageClass
);
String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING); String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) { if (Strings.hasLength(basePath)) {
@ -294,6 +354,52 @@ public class S3Repository extends BlobStoreRepository {
} else { } else {
this.basePath = BlobPath.cleanPath(); this.basePath = BlobPath.cleanPath();
} }
}
/**
* Init and verify initClientSideEncryption settings
*/
private EncryptionMaterials initClientSideEncryption(String symmetricKey, String publicKey, String privateKey, RepositoryName name) {
EncryptionMaterials clientSideEncryptionMaterials = null;
if (Strings.isNullOrEmpty(symmetricKey) == false && (Strings.isNullOrEmpty(publicKey) == false || Strings.isNullOrEmpty(privateKey) == false)) {
throw new RepositoryException(name.name(), "Client-side encryption: You can't specify an symmetric key AND a public/private key pair");
}
if (Strings.isNullOrEmpty(symmetricKey) == false || Strings.isNullOrEmpty(publicKey) == false || Strings.isNullOrEmpty(privateKey) == false) {
try {
// Check crypto
if (Cipher.getMaxAllowedKeyLength("AES") < 256) {
throw new RepositoryException(name.name(), "Client-side encryption: Please install the Java Cryptography Extension");
}
// Transform the keys in a EncryptionMaterials
if (Strings.isNullOrEmpty(symmetricKey) == false) {
clientSideEncryptionMaterials = new EncryptionMaterials(new SecretKeySpec(Base64.decode(symmetricKey), "AES"));
} else {
if (Strings.isNullOrEmpty(publicKey)|| Strings.isNullOrEmpty(privateKey)){
String missingKey = Strings.isNullOrEmpty(publicKey) ? "public key" : "private key";
throw new RepositoryException(name.name(), "Client-side encryption: " + missingKey + " is missing");
}
clientSideEncryptionMaterials = new EncryptionMaterials(new KeyPair(
KeyFactory.getInstance("RSA").generatePublic(new X509EncodedKeySpec(Base64.decode(publicKey))),
KeyFactory.getInstance("RSA").generatePrivate(new PKCS8EncodedKeySpec(Base64.decode(privateKey)))));
}
} catch (IllegalArgumentException e) {
throw new RepositoryException(name.name(), "Client-side encryption: Error decoding your keys: " + e.getMessage());
} catch (NoSuchAlgorithmException e) {
throw new RepositoryException(name.name(), e.getMessage());
} catch (InvalidKeySpecException e) {
throw new RepositoryException(name.name(), e.getMessage());
}
}
return clientSideEncryptionMaterials;
} }
/** /**

View File

@ -23,15 +23,9 @@ import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3ObjectSummary;
<<<<<<< HEAD:plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java
=======
import com.amazonaws.util.Base64; import com.amazonaws.util.Base64;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.gson.stream.JsonReader;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.gson.stream.MalformedJsonException;
>>>>>>> 98d508f... Add client-side encryption:src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
@ -41,11 +35,7 @@ import org.elasticsearch.cloud.aws.AbstractAwsTestCase;
import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
<<<<<<< HEAD:plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java
=======
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryException;
>>>>>>> 98d508f... Add client-side encryption:src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotMissingException;
@ -56,8 +46,10 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import javax.crypto.KeyGenerator; import javax.crypto.KeyGenerator;
import java.io.InputStreamReader; import java.security.KeyPair;
import java.security.*; import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -112,7 +104,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(settings .setType("s3").setSettings(settings
).get(); ).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3"); createIndex("test-idx-1", "test-idx-2", "test-idx-3");
@ -135,7 +127,6 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertMetadataFileIsNotEncrypted("test-snap");
logger.info("--> delete some data"); logger.info("--> delete some data");
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
@ -267,77 +258,6 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
} }
@Test @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testClientSideEncryption() throws NoSuchAlgorithmException {
KeyGenerator keyGenerator1 = KeyGenerator.getInstance("AES");
keyGenerator1.init(128);
String symmetricEncryptionKeyBase64 = Base64.encodeAsString(keyGenerator1.generateKey().getEncoded());
KeyPairGenerator keyGenerator2 = KeyPairGenerator.getInstance("RSA");
keyGenerator2.initialize(512, new SecureRandom());
KeyPair keyPair = keyGenerator2.generateKeyPair();
String publicEncryptionKeyBase64 = Base64.encodeAsString(keyPair.getPublic().getEncoded());
String privateEncryptionKeyBase64 = Base64.encodeAsString(keyPair.getPrivate().getEncoded());
Client client = client();
try {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("client_side_encryption_key.symmetric", symmetricEncryptionKeyBase64)
.put("client_side_encryption_key.public", publicEncryptionKeyBase64)
.put("client_side_encryption_key.private", privateEncryptionKeyBase64)
.put("chunk_size", randomIntBetween(1000, 10000))
).get();
fail("Symmetric and public/private key pairs are exclusive options. An exception should be thrown.");
} catch(RepositoryException e) {
}
List<ImmutableSettings.Builder> allSettings = Arrays.asList(
ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("client_side_encryption_key.symmetric", symmetricEncryptionKeyBase64)
.put("chunk_size", randomIntBetween(1000, 10000)),
ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("client_side_encryption_key.public", publicEncryptionKeyBase64)
.put("client_side_encryption_key.private", privateEncryptionKeyBase64)
.put("chunk_size", randomIntBetween(1000, 10000))
);
for(ImmutableSettings.Builder settings: allSettings) {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(settings).get();
// Create the index and index some data
createIndex("test-idx-1");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
// Take the snapshot
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertMetadataFileIsEncrypted("test-snap");
// Restore
cluster().wipeIndices("test-idx-1");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").execute().actionGet();
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
// Clean, the test will bbe run with different settings
cluster().wipeIndices("test-idx-1");
wipeRepositories();
cleanRepositoryFiles(basePath);
}
}
/** /**
* This test verifies that the test configuration is set up in a manner that * This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryWithCustomCredentials()} pointless. * does not make the test {@link #testRepositoryWithCustomCredentials()} pointless.
@ -488,6 +408,75 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
} }
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testClientSideEncryption() throws NoSuchAlgorithmException {
KeyGenerator keyGenerator1 = KeyGenerator.getInstance("AES");
keyGenerator1.init(128);
String symmetricEncryptionKeyBase64 = Base64.encodeAsString(keyGenerator1.generateKey().getEncoded());
KeyPairGenerator keyGenerator2 = KeyPairGenerator.getInstance("RSA");
keyGenerator2.initialize(512, new SecureRandom());
KeyPair keyPair = keyGenerator2.generateKeyPair();
String publicEncryptionKeyBase64 = Base64.encodeAsString(keyPair.getPublic().getEncoded());
String privateEncryptionKeyBase64 = Base64.encodeAsString(keyPair.getPrivate().getEncoded());
Client client = client();
try {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(Settings.settingsBuilder()
.put("base_path", basePath)
.put("client_side_encryption_key.symmetric", symmetricEncryptionKeyBase64)
.put("client_side_encryption_key.public", publicEncryptionKeyBase64)
.put("client_side_encryption_key.private", privateEncryptionKeyBase64)
.put("chunk_size", randomIntBetween(1000, 10000))
).get();
fail("Symmetric and public/private key pairs are exclusive options. An exception should be thrown.");
} catch (RepositoryException e) {
}
List<Settings.Builder> allSettings = Arrays.asList(
Settings.settingsBuilder()
.put("base_path", basePath)
.put("client_side_encryption_key.symmetric", symmetricEncryptionKeyBase64)
.put("chunk_size", randomIntBetween(1000, 10000)),
Settings.settingsBuilder()
.put("base_path", basePath)
.put("client_side_encryption_key.public", publicEncryptionKeyBase64)
.put("client_side_encryption_key.private", privateEncryptionKeyBase64)
.put("chunk_size", randomIntBetween(1000, 10000))
);
for (Settings.Builder settings : allSettings) {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(settings).get();
// Create the index and index some data
createIndex("test-idx-1");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
// Take the snapshot
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
// Restore
cluster().wipeIndices("test-idx-1");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").execute().actionGet();
ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().totalHits(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
// Clean, the test will bbe run with different settings
cluster().wipeIndices("test-idx-1");
wipeRepositories();
cleanRepositoryFiles(basePath);
}
}
private void assertRepositoryIsOperational(Client client, String repository) { private void assertRepositoryIsOperational(Client client, String repository) {
createIndex("test-idx-1"); createIndex("test-idx-1");
ensureGreen(); ensureGreen();
@ -524,51 +513,6 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().totalHits(), equalTo(100L)); assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().totalHits(), equalTo(100L));
} }
private void assertMetadataFileIsEncrypted(String snapshotName) {
Settings settings = internalCluster().getInstance(Settings.class);
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
settings.get("repositories.s3.endpoint"),
settings.get("repositories.s3.protocol"),
settings.get("repositories.s3.region"),
settings.get("cloud.aws.access_key"),
settings.get("cloud.aws.secret_key"));
String bucket = settings.get("repositories.s3.bucket");
String objectKey = basePath + "/metadata-" + snapshotName;
S3Object object = s3Client.getObject(bucket, objectKey);
try {
JsonReader jsonReader = new JsonReader(new InputStreamReader(object.getObjectContent()));
jsonReader.beginObject();
assertThat("The file hasn't been encrypted properly, its content is still readable!", jsonReader.nextName(), not(equalTo("meta-data")));
} catch(Exception e) {
// The json is not valid, the file is encrypted
// MalformedJsonException can't be catched directly so the following
// assertion is necessary to avoid silent failures.
assertThat(e, instanceOf(MalformedJsonException.class));
}
}
private void assertMetadataFileIsNotEncrypted(String snapshotName) {
Settings settings = internalCluster().getInstance(Settings.class);
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
settings.get("repositories.s3.endpoint"),
settings.get("repositories.s3.protocol"),
settings.get("repositories.s3.region"),
settings.get("cloud.aws.access_key"),
settings.get("cloud.aws.secret_key"));
String bucket = settings.get("repositories.s3.bucket");
String objectKey = basePath + "/metadata-" + snapshotName;
S3Object object = s3Client.getObject(bucket, objectKey);
JsonReader jsonReader = new JsonReader(new InputStreamReader(object.getObjectContent()));
jsonReader.beginObject();
assertThat("The file wasn't decrypted properly", jsonReader.nextName(), equalTo("meta-data"));
// The beginning of the file looks like json. If it was encrypted, it wouldn't.
}
/** /**
* Deletes repositories, supports wildcard notation. * Deletes repositories, supports wildcard notation.