diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java index c26e88887d0..67e6889abb0 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java @@ -44,10 +44,10 @@ import java.util.List; *

* Quick facts about S3: *

- * Maximum object size: 5 TB - * Maximum number of parts per upload: 10,000 + * Maximum object size: 5 TB + * Maximum number of parts per upload: 10,000 * Part numbers: 1 to 10,000 (inclusive) - * Part size: 5 MB to 5 GB, last part can be < 5 MB + * Part size: 5 MB to 5 GB, last part can be < 5 MB *

* See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html * See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html @@ -120,7 +120,7 @@ public class DefaultS3OutputStream extends S3OutputStream { } protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, - boolean serverSideEncryption) throws AmazonS3Exception { + boolean serverSideEncryption) throws AmazonS3Exception { ObjectMetadata md = new ObjectMetadata(); if (serverSideEncryption) { md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); @@ -202,15 +202,15 @@ public class DefaultS3OutputStream extends S3OutputStream { } protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, - int length, boolean lastPart) throws AmazonS3Exception { + int length, boolean lastPart) throws AmazonS3Exception { UploadPartRequest request = new UploadPartRequest() - .withBucketName(bucketName) - .withKey(blobName) - .withUploadId(uploadId) - .withPartNumber(multipartChunks) - .withInputStream(is) - .withPartSize(length) - .withLastPart(lastPart); + .withBucketName(bucketName) + .withKey(blobName) + .withUploadId(uploadId) + .withPartNumber(multipartChunks) + .withInputStream(is) + .withPartSize(length) + .withLastPart(lastPart); UploadPartResult response = blobStore.client().uploadPart(request); return response.getPartETag(); diff --git a/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java b/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java index c3807745a58..09fa1266de2 100644 --- a/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java +++ b/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java @@ -91,7 +91,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { .setType("s3").setSettings(ImmutableSettings.settingsBuilder() .put("base_path", basePath) .put("chunk_size", randomIntBetween(1000, 10000)) - ).get(); + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); createIndex("test-idx-1", "test-idx-2", "test-idx-3"); @@ -154,94 +154,94 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); } - + @Test @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211") public void testEncryption() { - Client client = client(); - logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); - PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") - .setType("s3").setSettings(ImmutableSettings.settingsBuilder() - .put("base_path", basePath) - .put("chunk_size", randomIntBetween(1000, 10000)) - .put("server_side_encryption", true) - ).get(); - assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + Client client = client(); + logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("chunk_size", randomIntBetween(1000, 10000)) + .put("server_side_encryption", true) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); - createIndex("test-idx-1", "test-idx-2", "test-idx-3"); - ensureGreen(); + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); - logger.info("--> indexing some data"); - for (int i = 0; i < 100; i++) { - index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); - index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i); - index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i); - } - refresh(); - assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); - assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); - assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L)); + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); + index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i); + index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L)); - logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); - Settings settings = internalCluster().getInstance(Settings.class); - Settings bucket = settings.getByPrefix("repositories.s3."); - AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client( - null, - null, - bucket.get("region", settings.get("repositories.s3.region")), - bucket.get("access_key", settings.get("cloud.aws.access_key")), - bucket.get("secret_key", settings.get("cloud.aws.secret_key"))); + Settings settings = internalCluster().getInstance(Settings.class); + Settings bucket = settings.getByPrefix("repositories.s3."); + AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client( + null, + null, + bucket.get("region", settings.get("repositories.s3.region")), + bucket.get("access_key", settings.get("cloud.aws.access_key")), + bucket.get("secret_key", settings.get("cloud.aws.secret_key"))); - String bucketName = bucket.get("bucket"); - logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath); - List summaries = s3Client.listObjects(bucketName, basePath).getObjectSummaries(); - for (S3ObjectSummary summary : summaries) { - assertThat(s3Client.getObjectMetadata(bucketName, summary.getKey()).getSSEAlgorithm(), equalTo("AES256")); - } + String bucketName = bucket.get("bucket"); + logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath); + List summaries = s3Client.listObjects(bucketName, basePath).getObjectSummaries(); + for (S3ObjectSummary summary : summaries) { + assertThat(s3Client.getObjectMetadata(bucketName, summary.getKey()).getSSEAlgorithm(), equalTo("AES256")); + } - logger.info("--> delete some data"); - for (int i = 0; i < 50; i++) { - client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get(); - } - for (int i = 50; i < 100; i++) { - client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get(); - } - for (int i = 0; i < 100; i += 2) { - client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get(); - } - refresh(); - assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L)); - assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L)); - assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); + logger.info("--> delete some data"); + for (int i = 0; i < 50; i++) { + client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get(); + } + for (int i = 50; i < 100; i++) { + client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get(); + } + for (int i = 0; i < 100; i += 2) { + client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get(); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); - logger.info("--> close indices"); - client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); + logger.info("--> close indices"); + client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); - logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - ensureGreen(); - assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); - assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); - assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); - // Test restore after index deletion - logger.info("--> delete indices"); - cluster().wipeIndices("test-idx-1", "test-idx-2"); - logger.info("--> restore one index after deletion"); - restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - ensureGreen(); - assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); - ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); - assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); - assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + // Test restore after index deletion + logger.info("--> delete indices"); + cluster().wipeIndices("test-idx-1", "test-idx-2"); + logger.info("--> restore one index after deletion"); + restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); + assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); } /** @@ -254,9 +254,9 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket."); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); client.admin().cluster().preparePutRepository("test-repo") - .setType("s3").setSettings(ImmutableSettings.settingsBuilder() - .put("base_path", basePath) - .put("bucket", bucketSettings.get("bucket")) + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("bucket", bucketSettings.get("bucket")) ).get(); fail("repository verification should have raise an exception!"); } @@ -273,7 +273,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { .put("access_key", bucketSettings.get("access_key")) .put("secret_key", bucketSettings.get("secret_key")) .put("bucket", bucketSettings.get("bucket")) - ).get(); + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertRepositoryIsOperational(client, "test-repo"); @@ -291,7 +291,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { .put("access_key", bucketSettings.get("access_key")) .put("secret_key", bucketSettings.get("secret_key")) .put("base_path", basePath) - ).get(); + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertRepositoryIsOperational(client, "test-repo"); } @@ -306,11 +306,11 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.remote-bucket."); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); client.admin().cluster().preparePutRepository("test-repo") - .setType("s3").setSettings(ImmutableSettings.settingsBuilder() - .put("base_path", basePath) - .put("bucket", bucketSettings.get("bucket")) -// Below setting intentionally omitted to assert bucket is not available in default region. -// .put("region", privateBucketSettings.get("region")) + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("bucket", bucketSettings.get("bucket")) + // Below setting intentionally omitted to assert bucket is not available in default region. + // .put("region", privateBucketSettings.get("region")) ).get(); fail("repository verification should have raise an exception!"); @@ -327,7 +327,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { .put("base_path", basePath) .put("bucket", bucketSettings.get("bucket")) .put("region", bucketSettings.get("region")) - ).get(); + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); assertRepositoryIsOperational(client, "test-repo"); @@ -342,8 +342,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath); PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("s3").setSettings(ImmutableSettings.settingsBuilder() - .put("base_path", basePath) - ).get(); + .put("base_path", basePath) + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); logger.info("--> restore non existing snapshot"); @@ -364,8 +364,8 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { logger.info("--> creating s3 repository without any path"); PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo") .setType("s3").setSettings(ImmutableSettings.settingsBuilder() - .put("base_path", basePath) - ).get(); + .put("base_path", basePath) + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); try { @@ -383,7 +383,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { } } - private void assertRepositoryIsOperational(Client client, String repository) { + private void assertRepositoryIsOperational(Client client, String repository) { createIndex("test-idx-1"); ensureGreen(); @@ -447,7 +447,7 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { settings.getByPrefix("repositories.s3.private-bucket."), settings.getByPrefix("repositories.s3.remote-bucket."), settings.getByPrefix("repositories.s3.external-bucket.") - }; + }; for (Settings bucket : buckets) { String endpoint = bucket.get("endpoint", settings.get("repositories.s3.endpoint")); String protocol = bucket.get("protocol", settings.get("repositories.s3.protocol"));