diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 89ea3e55bfb..da2fc588e4d 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -46,7 +47,6 @@ import org.junit.Before; import java.io.IOException; import java.io.InputStream; -import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; @@ -99,13 +99,10 @@ public class S3BlobContainerRetriesTests extends ESTestCase { final Settings.Builder clientSettings = Settings.builder(); final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); - final String endpoint; - if (httpServer.getAddress().getAddress() instanceof Inet6Address) { - endpoint = "http://[" + httpServer.getAddress().getHostString() + "]:" + httpServer.getAddress().getPort(); - } else { - endpoint = "http://" + httpServer.getAddress().getHostString() + ":" + httpServer.getAddress().getPort(); - } + final InetSocketAddress address = httpServer.getAddress(); + final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), endpoint); + if (maxRetries != null) { clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries); } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index efb07759c2b..dd622a11c2f 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -18,53 +18,70 @@ */ package org.elasticsearch.repositories.s3; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.StorageClass; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.env.Environment; +import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.RestUtils; import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Locale; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.Matchers.nullValue; + +@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase { - private static final ConcurrentMap blobs = new ConcurrentHashMap<>(); - private static String bucket; - private static ByteSizeValue bufferSize; - private static boolean serverSideEncryption; - private static String cannedACL; - private static String storageClass; + private static HttpServer httpServer; @BeforeClass - public static void setUpRepositorySettings() { - bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); - bufferSize = new ByteSizeValue(randomIntBetween(5, 50), ByteSizeUnit.MB); - serverSideEncryption = randomBoolean(); - if (randomBoolean()) { - cannedACL = randomFrom(CannedAccessControlList.values()).toString(); - } - if (randomBoolean()) { - storageClass = randomValueOtherThan(StorageClass.Glacier, () -> randomFrom(StorageClass.values())).toString(); - } + public static void startHttpServer() throws Exception { + httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + httpServer.start(); + } + + @Before + public void setUpHttpServer() { + httpServer.createContext("/bucket", new InternalHttpHandler()); + } + + @AfterClass + public static void stopHttpServer() { + httpServer.stop(0); + httpServer = null; } @After - public void wipeRepository() { - blobs.clear(); + public void tearDownHttpServer() { + httpServer.removeContext("/bucket"); } @Override @@ -75,11 +92,8 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa @Override protected Settings repositorySettings() { return Settings.builder() - .put(S3Repository.BUCKET_SETTING.getKey(), bucket) - .put(S3Repository.BUFFER_SIZE_SETTING.getKey(), bufferSize) - .put(S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), serverSideEncryption) - .put(S3Repository.CANNED_ACL_SETTING.getKey(), cannedACL) - .put(S3Repository.STORAGE_CLASS_SETTING.getKey(), storageClass) + .put(S3Repository.BUCKET_SETTING.getKey(), "bucket") + .put(S3Repository.CLIENT_NAME.getKey(), "test") .build(); } @@ -88,6 +102,25 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa return Collections.singletonList(TestS3RepositoryPlugin.class); } + @Override + protected Settings nodeSettings(int nodeOrdinal) { + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString(S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "access"); + secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); + + final InetSocketAddress address = httpServer.getAddress(); + final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); + + return Settings.builder() + .put(Settings.builder() + .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint) + .put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) + .build()) + .put(super.nodeSettings(nodeOrdinal)) + .setSecureSettings(secureSettings) + .build(); + } + public static class TestS3RepositoryPlugin extends S3RepositoryPlugin { public TestS3RepositoryPlugin(final Settings settings) { @@ -95,15 +128,105 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry, - final ThreadPool threadPool) { - return Collections.singletonMap(S3Repository.TYPE, - metadata -> new S3Repository(metadata, registry, new S3Service() { - @Override - AmazonS3 buildClient(S3ClientSettings clientSettings) { - return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass); + public List> getSettings() { + final List> settings = new ArrayList<>(super.getSettings()); + // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side + settings.add(S3ClientSettings.DISABLE_CHUNKED_ENCODING); + return settings; + } + } + + /** + * Minimal HTTP handler that acts as a S3 compliant server + */ + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") + private static class InternalHttpHandler implements HttpHandler { + + private final ConcurrentMap blobs = new ConcurrentHashMap<>(); + + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); + try { + if (Regex.simpleMatch("PUT /bucket/*", request)) { + blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody())); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + + } else if (Regex.simpleMatch("GET /bucket/?prefix=*", request)) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + assertThat("Test must be adapted for GET Bucket (List Objects) Version 2", params.get("list-type"), nullValue()); + + final StringBuilder list = new StringBuilder(); + list.append(""); + list.append(""); + final String prefix = params.get("prefix"); + if (prefix != null) { + list.append("").append(prefix).append(""); + } + for (Map.Entry blob : blobs.entrySet()) { + if (prefix == null || blob.getKey().startsWith("/bucket/" + prefix)) { + list.append(""); + list.append("").append(blob.getKey().replace("/bucket/", "")).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append(""); } - }, threadPool)); + } + list.append(""); + + byte[] response = list.toString().getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("GET /bucket/*", request)) { + final BytesReference blob = blobs.get(exchange.getRequestURI().toString()); + if (blob != null) { + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length()); + blob.writeTo(exchange.getResponseBody()); + } else { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } + + } else if (Regex.simpleMatch("DELETE /bucket/*", request)) { + int deletions = 0; + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); + if (blob.getKey().startsWith(exchange.getRequestURI().toString())) { + iterator.remove(); + deletions++; + } + } + exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1); + + } else if (Regex.simpleMatch("POST /bucket/?delete", request)) { + final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8)); + + final StringBuilder deletes = new StringBuilder(); + deletes.append(""); + deletes.append(""); + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); + String key = blob.getKey().replace("/bucket/", ""); + if (requestBody.contains("" + key + "")) { + deletes.append("").append(key).append(""); + iterator.remove(); + } + } + deletes.append(""); + + byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else { + exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1); + } + } finally { + exchange.close(); + } } } }