Use Azure Bulk Deletes in Azure Repository (#53919) (#53967)

Now that we upgraded the Azure SDK to 8.6.2 in #53865 we can make use of
bulk deletes.
This commit is contained in:
Armin Braun 2020-03-23 13:35:05 +01:00 committed by GitHub
parent aef7b89219
commit b51ea25a00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 112 additions and 132 deletions

View File

@ -88,7 +88,5 @@ testClusters.integTest {
// in a hacky way to change the protocol and endpoint. We must fix that. // in a hacky way to change the protocol and endpoint. We must fix that.
setting 'azure.client.integration_test.endpoint_suffix', setting 'azure.client.integration_test.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0)
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
} }
} }

View File

@ -23,17 +23,12 @@ import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -42,20 +37,18 @@ import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.stream.Collectors;
public class AzureBlobContainer extends AbstractBlobContainer { public class AzureBlobContainer extends AbstractBlobContainer {
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class); private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
private final AzureBlobStore blobStore; private final AzureBlobStore blobStore;
private final ThreadPool threadPool;
private final String keyPath; private final String keyPath;
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) { AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path); super(path);
this.blobStore = blobStore; this.blobStore = blobStore;
this.keyPath = path.buildAsString(); this.keyPath = path.buildAsString();
this.threadPool = threadPool;
} }
private boolean blobExists(String blobName) { private boolean blobExists(String blobName) {
@ -112,7 +105,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public DeleteResult delete() throws IOException { public DeleteResult delete() throws IOException {
try { try {
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); return blobStore.deleteBlobDirectory(keyPath);
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
throw new IOException(e); throw new IOException(e);
} }
@ -120,33 +113,9 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException { public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
if (blobNames.isEmpty()) {
result.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.execute(ActionRunnable.run(listener, () -> {
logger.trace("deleteBlob({})", blobName);
try { try {
blobStore.deleteBlob(buildKey(blobName)); blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
} catch (StorageException e) { } catch (URISyntaxException | StorageException e) {
if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw new IOException(e);
}
} catch (URISyntaxException e) {
throw new IOException(e);
}
}));
}
}
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e); throw new IOException("Exception during bulk delete", e);
} }
} }

View File

@ -28,14 +28,13 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.repositories.azure.AzureRepository.Repository; import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -44,17 +43,15 @@ import static java.util.Collections.emptyMap;
public class AzureBlobStore implements BlobStore { public class AzureBlobStore implements BlobStore {
private final AzureStorageService service; private final AzureStorageService service;
private final ThreadPool threadPool;
private final String clientName; private final String clientName;
private final String container; private final String container;
private final LocationMode locationMode; private final LocationMode locationMode;
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) { public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings()); this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service; this.service = service;
this.threadPool = threadPool;
// locationMode is set per repository, not per client // locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap()); final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
@ -80,7 +77,7 @@ public class AzureBlobStore implements BlobStore {
@Override @Override
public BlobContainer blobContainer(BlobPath path) { public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this, threadPool); return new AzureBlobContainer(path, this);
} }
@Override @Override
@ -91,13 +88,12 @@ public class AzureBlobStore implements BlobStore {
return service.blobExists(clientName, container, blob); return service.blobExists(clientName, container, blob);
} }
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { public void deleteBlobsIgnoringIfNotExists(Collection<String> blobs) throws URISyntaxException, StorageException {
service.deleteBlob(clientName, container, blob); service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs);
} }
public DeleteResult deleteBlobDirectory(String path, Executor executor) public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException {
throws URISyntaxException, StorageException, IOException { return service.deleteBlobDirectory(clientName, container, path);
return service.deleteBlobDirectory(clientName, container, path, executor);
} }
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
@ -111,7 +107,7 @@ public class AzureBlobStore implements BlobStore {
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException { public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
} }
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)

View File

@ -115,7 +115,7 @@ public class AzureRepository extends BlobStoreRepository {
@Override @Override
protected AzureBlobStore createBlobStore() { protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool); final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
logger.debug(() -> new ParameterizedMessage( logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]", "using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

View File

@ -23,16 +23,12 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -43,8 +39,6 @@ import java.util.Map;
*/ */
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin { public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
// protected for testing // protected for testing
final AzureStorageService azureStoreService; final AzureStorageService azureStoreService;
@ -80,15 +74,6 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
); );
} }
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections.singletonList(executorBuilder());
}
public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
}
@Override @Override
public void reload(Settings settings) { public void reload(Settings settings) {
// secure settings should be readable // secure settings should be readable

View File

@ -20,13 +20,16 @@
package org.elasticsearch.repositories.azure; package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.BatchException;
import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicy; import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.BlobProperties;
@ -42,7 +45,6 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.DeleteResult;
@ -53,7 +55,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -67,9 +68,10 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -188,72 +190,61 @@ public class AzureStorageService {
}); });
} }
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException { public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection<String> blobs)
throws URISyntaxException, StorageException {
logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account); final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
// Container name must be lower case. // Container name must be lower case.
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob)); final Iterator<String> blobIterator = blobs.iterator();
SocketAccess.doPrivilegedVoidException(() -> { int currentBatchSize = 0;
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob); while (blobIterator.hasNext()) {
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob)); final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation();
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get()); do {
}); batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()),
DeleteSnapshotsOption.NONE, null, null);
++currentBatchSize;
} while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS);
currentBatchSize = 0;
try {
SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp));
} catch (BatchException e) {
for (StorageException ex : e.getExceptions().values()) {
if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
logger.error("Batch exceptions [{}]", e.getExceptions());
throw e;
}
}
}
}
} }
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor) DeleteResult deleteBlobDirectory(String account, String container, String path)
throws URISyntaxException, StorageException, IOException { throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account); final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
final AtomicLong blobsDeleted = new AtomicLong(); final AtomicLong blobsDeleted = new AtomicLong();
final AtomicLong bytesDeleted = new AtomicLong(); final AtomicLong bytesDeleted = new AtomicLong();
final List<String> blobsToDelete = new ArrayList<>();
SocketAccess.doPrivilegedVoidException(() -> { SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the / // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1); final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len; final long len;
if (blobItem instanceof CloudBlob) { if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength(); len = ((CloudBlob) blobItem).getProperties().getLength();
} else { } else {
len = -1L; len = -1L;
} }
deleteBlob(account, container, blobPath); blobsToDelete.add(blobPath);
blobsDeleted.incrementAndGet(); blobsDeleted.incrementAndGet();
if (len >= 0) { if (len >= 0) {
bytesDeleted.addAndGet(len); bytesDeleted.addAndGet(len);
} }
} }
@Override
public void onFailure(Exception e) {
exceptions.add(e);
}
@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
}
}); });
} deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete);
});
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
result.actionGet();
if (exceptions.isEmpty() == false) {
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
exceptions.forEach(ex::addSuppressed);
throw ex;
}
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get()); return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
} }

View File

@ -44,8 +44,6 @@ import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -64,7 +62,6 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -91,11 +88,9 @@ import static org.hamcrest.Matchers.lessThan;
public class AzureBlobContainerRetriesTests extends ESTestCase { public class AzureBlobContainerRetriesTests extends ESTestCase {
private HttpServer httpServer; private HttpServer httpServer;
private ThreadPool threadPool;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
threadPool = new TestThreadPool(getTestClass().getName(), AzureRepositoryPlugin.executorBuilder());
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start(); httpServer.start();
super.setUp(); super.setUp();
@ -105,7 +100,6 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
public void tearDown() throws Exception { public void tearDown() throws Exception {
httpServer.stop(0); httpServer.stop(0);
super.tearDown(); super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
} }
private BlobContainer createBlobContainer(final int maxRetries) { private BlobContainer createBlobContainer(final int maxRetries) {
@ -145,7 +139,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
.put(ACCOUNT_SETTING.getKey(), clientName) .put(ACCOUNT_SETTING.getKey(), clientName)
.build()); .build());
return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service, threadPool), threadPool); return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service));
} }
public void testReadNonexistentBlobThrowsNoSuchFileException() { public void testReadNonexistentBlobThrowsNoSuchFileException() {

View File

@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
@Override @Override
protected Map<String, HttpHandler> createHttpHandlers() { protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); return Collections.singletonMap("/", new AzureBlobStoreHttpHandler("container"));
} }
@Override @Override

View File

@ -30,7 +30,7 @@ public class AzureHttpFixture {
private AzureHttpFixture(final String address, final int port, final String container) throws IOException { private AzureHttpFixture(final String address, final int port, final String container) throws IOException {
this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0); this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
server.createContext("/" + container, new AzureHttpHandler(container)); server.createContext("/", new AzureHttpHandler(container));
} }
private void start() throws Exception { private void start() throws Exception {

View File

@ -22,16 +22,21 @@ import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -67,18 +72,21 @@ public class AzureHttpHandler implements HttpHandler {
assert read == -1 : "Request body should have been empty but saw [" + read + "]"; assert read == -1 : "Request body should have been empty but saw [" + read + "]";
} }
try { try {
// Request body is closed in the finally block
final BytesReference requestBody = Streams.readFully(Streams.noCloseStream(exchange.getRequestBody()));
if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) { if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
// Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
final Map<String, String> params = new HashMap<>(); final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blockId = params.get("blockid"); final String blockId = params.get("blockid");
blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); blobs.put(blockId, requestBody);
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) { } else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
// Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list) // Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); final String blockList =
Streams.copyToString(new InputStreamReader(requestBody.streamInput(), StandardCharsets.UTF_8));
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>")) final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
.filter(line -> line.contains("</Latest>")) .filter(line -> line.contains("</Latest>"))
.map(line -> line.substring(0, line.indexOf("</Latest>"))) .map(line -> line.substring(0, line.indexOf("</Latest>")))
@ -97,12 +105,12 @@ public class AzureHttpHandler implements HttpHandler {
// PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob) // PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match"); final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
if ("*".equals(ifNoneMatch)) { if ("*".equals(ifNoneMatch)) {
if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) { if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), requestBody) != null) {
sendError(exchange, RestStatus.CONFLICT); sendError(exchange, RestStatus.CONFLICT);
return; return;
} }
} else { } else {
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); blobs.put(exchange.getRequestURI().getPath(), requestBody);
} }
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
@ -190,6 +198,45 @@ public class AzureHttpHandler implements HttpHandler {
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response); exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("POST /?comp=batch", request)) {
// Batch Delete (https://docs.microsoft.com/en-us/rest/api/storageservices/blob-batch)
try (BufferedReader reader = new BufferedReader(new InputStreamReader(requestBody.streamInput()))) {
final Set<String> toDelete = reader.lines().filter(l -> l.startsWith("DELETE"))
.map(l -> l.split(" ")[1]).collect(Collectors.toSet());
final BytesStreamOutput baos = new BytesStreamOutput();
final String batchSeparator = "batchresponse_" + UUIDs.randomBase64UUID();
try (Writer writer = new OutputStreamWriter(baos)) {
int contentId = 0;
for (String b : toDelete) {
writer.write("\r\n--" + batchSeparator + "\r\n" +
"Content-Type: application/http \r\n" +
"Content-ID: " + contentId++ + " \r\n");
if (blobs.remove(b) == null) {
writer.write("\r\nHTTP/1.1 404 The specified blob does not exist. \r\n" +
"x-ms-error-code: BlobNotFound \r\n" +
"x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" +
"x-ms-version: 2018-11-09\r\n" +
"Content-Length: 216 \r\n" +
"Content-Type: application/xml\r\n\r\n" +
"<?xml version=\"1.0\" encoding=\"utf-8\"?> \r\n" +
"<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\r\n" +
"RequestId:" + UUIDs.randomBase64UUID() + "\r\n" +
"Time:2020-01-01T01:01:01.0000000Z</Message></Error>\r\n");
} else {
writer.write(
"\r\nHTTP/1.1 202 Accepted \r\n" +
"x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" +
"x-ms-version: 2018-11-09\r\n\r\n");
}
}
writer.write("--" + batchSeparator + "--");
}
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type",
"multipart/mixed; boundary=" + batchSeparator);
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), baos.size());
baos.bytes().writeTo(exchange.getResponseBody());
}
} else { } else {
sendError(exchange, RestStatus.BAD_REQUEST); sendError(exchange, RestStatus.BAD_REQUEST);
} }