From 427196346290fd9dece6bb693b05146c74b045a9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 25 Mar 2020 12:13:25 +0100 Subject: [PATCH] Revert "Use Azure Bulk Deletes in Azure Repository (#53919)" (#54089) (#54111) This reverts commit 23cccf088810b8416ed278571352393cc2de9523. Unfortunately SAS token auth still doesn't work with bulk deletes so we can't use them yet. Closes #54080 --- .../qa/microsoft-azure-storage/build.gradle | 2 + .../azure/AzureBlobContainer.java | 41 +++++++- .../repositories/azure/AzureBlobStore.java | 20 ++-- .../repositories/azure/AzureRepository.java | 2 +- .../azure/AzureRepositoryPlugin.java | 15 +++ .../azure/AzureStorageService.java | 97 ++++++++++--------- .../azure/AzureBlobContainerRetriesTests.java | 8 +- .../azure/AzureBlobStoreRepositoryTests.java | 2 +- .../java/fixture/azure/AzureHttpFixture.java | 2 +- .../java/fixture/azure/AzureHttpHandler.java | 55 +---------- 10 files changed, 132 insertions(+), 112 deletions(-) diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle b/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle index 3e5cd7758d2..b594ff3b172 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle +++ b/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle @@ -88,5 +88,7 @@ testClusters.integTest { // in a hacky way to change the protocol and endpoint. We must fix that. setting 'azure.client.integration_test.endpoint_suffix', { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE + String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0) + setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 6bd480f7923..2093139e115 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -23,12 +23,17 @@ import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -37,18 +42,20 @@ import java.net.URISyntaxException; import java.nio.file.NoSuchFileException; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutorService; public class AzureBlobContainer extends AbstractBlobContainer { private final Logger logger = LogManager.getLogger(AzureBlobContainer.class); private final AzureBlobStore blobStore; + private final ThreadPool threadPool; private final String keyPath; - AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) { + AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) { super(path); this.blobStore = blobStore; this.keyPath = path.buildAsString(); + this.threadPool = threadPool; } private boolean blobExists(String blobName) { @@ -105,7 +112,7 @@ public class AzureBlobContainer extends AbstractBlobContainer { @Override public DeleteResult delete() throws IOException { try { - return blobStore.deleteBlobDirectory(keyPath); + return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); } catch (URISyntaxException | StorageException e) { throw new IOException(e); } @@ -113,9 +120,33 @@ public class AzureBlobContainer extends AbstractBlobContainer { @Override public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + final PlainActionFuture result = PlainActionFuture.newFuture(); + if (blobNames.isEmpty()) { + result.onResponse(null); + } else { + final GroupedActionListener listener = + new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size()); + final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); + // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint + // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. + for (String blobName : blobNames) { + executor.execute(ActionRunnable.run(listener, () -> { + logger.trace("deleteBlob({})", blobName); + try { + blobStore.deleteBlob(buildKey(blobName)); + } catch (StorageException e) { + if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw new IOException(e); + } + } catch (URISyntaxException e) { + throw new IOException(e); + } + })); + } + } try { - blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList())); - } catch (URISyntaxException | StorageException e) { + result.actionGet(); + } catch (Exception e) { throw new IOException("Exception during bulk delete", e); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 173f13d801f..714e29edea2 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -28,13 +28,14 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.repositories.azure.AzureRepository.Repository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; -import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -43,15 +44,17 @@ import static java.util.Collections.emptyMap; public class AzureBlobStore implements BlobStore { private final AzureStorageService service; + private final ThreadPool threadPool; private final String clientName; private final String container; private final LocationMode locationMode; - public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) { + public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) { this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); this.clientName = Repository.CLIENT_NAME.get(metadata.settings()); this.service = service; + this.threadPool = threadPool; // locationMode is set per repository, not per client this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); final Map prevSettings = this.service.refreshAndClearCache(emptyMap()); @@ -77,7 +80,7 @@ public class AzureBlobStore implements BlobStore { @Override public BlobContainer blobContainer(BlobPath path) { - return new AzureBlobContainer(path, this); + return new AzureBlobContainer(path, this, threadPool); } @Override @@ -88,12 +91,13 @@ public class AzureBlobStore implements BlobStore { return service.blobExists(clientName, container, blob); } - public void deleteBlobsIgnoringIfNotExists(Collection blobs) throws URISyntaxException, StorageException { - service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs); + public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { + service.deleteBlob(clientName, container, blob); } - public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException { - return service.deleteBlobDirectory(clientName, container, path); + public DeleteResult deleteBlobDirectory(String path, Executor executor) + throws URISyntaxException, StorageException, IOException { + return service.deleteBlobDirectory(clientName, container, path, executor); } public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { @@ -107,7 +111,7 @@ public class AzureBlobStore implements BlobStore { public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( - Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this)))); + Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); } public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index c39f951db56..e07ffedb444 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -115,7 +115,7 @@ public class AzureRepository extends BlobStoreRepository { @Override protected AzureBlobStore createBlobStore() { - final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService); + final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool); logger.debug(() -> new ParameterizedMessage( "using container [{}], chunk_size [{}], compress [{}], base_path [{}]", diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index ae1258a73b4..d98a7c3cbd7 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -23,12 +23,16 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilder; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -39,6 +43,8 @@ import java.util.Map; */ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin { + public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure"; + // protected for testing final AzureStorageService azureStoreService; @@ -74,6 +80,15 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R ); } + @Override + public List> getExecutorBuilders(Settings settings) { + return Collections.singletonList(executorBuilder()); + } + + public static ExecutorBuilder executorBuilder() { + return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L)); + } + @Override public void reload(Settings settings) { // secure settings should be readable diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index a7ddcbf48e1..ca7cb3475b4 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -20,16 +20,13 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.AccessCondition; -import com.microsoft.azure.storage.BatchException; import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.Constants; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicy; import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; @@ -45,6 +42,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; @@ -55,6 +53,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import java.io.IOException; import java.io.InputStream; @@ -68,10 +67,9 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -190,61 +188,72 @@ public class AzureStorageService { }); } - public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection blobs) - throws URISyntaxException, StorageException { - logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs)); + public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException { final Tuple> client = client(account); // Container name must be lower case. final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); - final Iterator blobIterator = blobs.iterator(); - int currentBatchSize = 0; - while (blobIterator.hasNext()) { - final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation(); - do { - batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()), - DeleteSnapshotsOption.NONE, null, null); - ++currentBatchSize; - } while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS); - currentBatchSize = 0; - try { - SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp)); - } catch (BatchException e) { - for (StorageException ex : e.getExceptions().values()) { - if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { - logger.error("Batch exceptions [{}]", e.getExceptions()); - throw e; - } - } - } - } + logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob)); + SocketAccess.doPrivilegedVoidException(() -> { + final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob); + logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob)); + azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get()); + }); } - DeleteResult deleteBlobDirectory(String account, String container, String path) + DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); + final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); + final AtomicLong outstanding = new AtomicLong(1L); + final PlainActionFuture result = PlainActionFuture.newFuture(); final AtomicLong blobsDeleted = new AtomicLong(); final AtomicLong bytesDeleted = new AtomicLong(); - final List blobsToDelete = new ArrayList<>(); SocketAccess.doPrivilegedVoidException(() -> { - for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { + for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1); - final long len; - if (blobItem instanceof CloudBlob) { - len = ((CloudBlob) blobItem).getProperties().getLength(); - } else { - len = -1L; - } - blobsToDelete.add(blobPath); - blobsDeleted.incrementAndGet(); - if (len >= 0) { - bytesDeleted.addAndGet(len); - } + outstanding.incrementAndGet(); + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + final long len; + if (blobItem instanceof CloudBlob) { + len = ((CloudBlob) blobItem).getProperties().getLength(); + } else { + len = -1L; + } + deleteBlob(account, container, blobPath); + blobsDeleted.incrementAndGet(); + if (len >= 0) { + bytesDeleted.addAndGet(len); + } + } + + @Override + public void onFailure(Exception e) { + exceptions.add(e); + } + + @Override + public void onAfter() { + if (outstanding.decrementAndGet() == 0) { + result.onResponse(null); + } + } + }); } }); - deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete); + if (outstanding.decrementAndGet() == 0) { + result.onResponse(null); + } + result.actionGet(); + if (exceptions.isEmpty() == false) { + final IOException ex = new IOException("Deleting directory [" + path + "] failed"); + exceptions.forEach(ex::addSuppressed); + throw ex; + } return new DeleteResult(blobsDeleted.get(), bytesDeleted.get()); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index b8ec6115b4d..ce3cba065c3 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -44,6 +44,8 @@ import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; @@ -62,6 +64,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; @@ -88,9 +91,11 @@ import static org.hamcrest.Matchers.lessThan; public class AzureBlobContainerRetriesTests extends ESTestCase { private HttpServer httpServer; + private ThreadPool threadPool; @Before public void setUp() throws Exception { + threadPool = new TestThreadPool(getTestClass().getName(), AzureRepositoryPlugin.executorBuilder()); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); super.setUp(); @@ -100,6 +105,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { public void tearDown() throws Exception { httpServer.stop(0); super.tearDown(); + ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS); } private BlobContainer createBlobContainer(final int maxRetries) { @@ -139,7 +145,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { .put(ACCOUNT_SETTING.getKey(), clientName) .build()); - return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service)); + return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service, threadPool), threadPool); } public void testReadNonexistentBlobThrowsNoSuchFileException() { diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 47703b90e43..b23693fd268 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/", new AzureBlobStoreHttpHandler("container")); + return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); } @Override diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java index 5498c4ebc03..1def1439429 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java @@ -30,7 +30,7 @@ public class AzureHttpFixture { private AzureHttpFixture(final String address, final int port, final String container) throws IOException { this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0); - server.createContext("/", new AzureHttpHandler(container)); + server.createContext("/" + container, new AzureHttpHandler(container)); } private void start() throws Exception { diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java index 6cd93fc9347..7a94a8c9f2e 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -22,21 +22,16 @@ import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; -import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -72,21 +67,18 @@ public class AzureHttpHandler implements HttpHandler { assert read == -1 : "Request body should have been empty but saw [" + read + "]"; } try { - // Request body is closed in the finally block - final BytesReference requestBody = Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())); if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) { // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) final Map params = new HashMap<>(); RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); final String blockId = params.get("blockid"); - blobs.put(blockId, requestBody); + blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); } else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) { // Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list) - final String blockList = - Streams.copyToString(new InputStreamReader(requestBody.streamInput(), StandardCharsets.UTF_8)); + final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); final List blockIds = Arrays.stream(blockList.split("")) .filter(line -> line.contains("")) .map(line -> line.substring(0, line.indexOf(""))) @@ -105,12 +97,12 @@ public class AzureHttpHandler implements HttpHandler { // PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob) final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match"); if ("*".equals(ifNoneMatch)) { - if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), requestBody) != null) { + if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) { sendError(exchange, RestStatus.CONFLICT); return; } } else { - blobs.put(exchange.getRequestURI().getPath(), requestBody); + blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); } exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); @@ -198,45 +190,6 @@ public class AzureHttpHandler implements HttpHandler { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); - } else if (Regex.simpleMatch("POST /?comp=batch", request)) { - // Batch Delete (https://docs.microsoft.com/en-us/rest/api/storageservices/blob-batch) - try (BufferedReader reader = new BufferedReader(new InputStreamReader(requestBody.streamInput()))) { - final Set toDelete = reader.lines().filter(l -> l.startsWith("DELETE")) - .map(l -> l.split(" ")[1]).collect(Collectors.toSet()); - final BytesStreamOutput baos = new BytesStreamOutput(); - final String batchSeparator = "batchresponse_" + UUIDs.randomBase64UUID(); - try (Writer writer = new OutputStreamWriter(baos)) { - int contentId = 0; - for (String b : toDelete) { - writer.write("\r\n--" + batchSeparator + "\r\n" + - "Content-Type: application/http \r\n" + - "Content-ID: " + contentId++ + " \r\n"); - if (blobs.remove(b) == null) { - writer.write("\r\nHTTP/1.1 404 The specified blob does not exist. \r\n" + - "x-ms-error-code: BlobNotFound \r\n" + - "x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" + - "x-ms-version: 2018-11-09\r\n" + - "Content-Length: 216 \r\n" + - "Content-Type: application/xml\r\n\r\n" + - " \r\n" + - "BlobNotFoundThe specified blob does not exist.\r\n" + - "RequestId:" + UUIDs.randomBase64UUID() + "\r\n" + - "Time:2020-01-01T01:01:01.0000000Z\r\n"); - } else { - writer.write( - "\r\nHTTP/1.1 202 Accepted \r\n" + - "x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" + - "x-ms-version: 2018-11-09\r\n\r\n"); - } - } - writer.write("--" + batchSeparator + "--"); - } - final Headers headers = exchange.getResponseHeaders(); - headers.add("Content-Type", - "multipart/mixed; boundary=" + batchSeparator); - exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), baos.size()); - baos.bytes().writeTo(exchange.getResponseBody()); - } } else { sendError(exchange, RestStatus.BAD_REQUEST); }