This reverts commit 23cccf088810b8416ed278571352393cc2de9523. Unfortunately SAS token auth still doesn't work with bulk deletes so we can't use them yet. Closes #54080
This commit is contained in:
parent
e380a5a8c3
commit
4271963462
|
@ -88,5 +88,7 @@ testClusters.integTest {
|
||||||
// in a hacky way to change the protocol and endpoint. We must fix that.
|
// in a hacky way to change the protocol and endpoint. We must fix that.
|
||||||
setting 'azure.client.integration_test.endpoint_suffix',
|
setting 'azure.client.integration_test.endpoint_suffix',
|
||||||
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
|
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
|
||||||
|
String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0)
|
||||||
|
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,12 +23,17 @@ import com.microsoft.azure.storage.LocationMode;
|
||||||
import com.microsoft.azure.storage.StorageException;
|
import com.microsoft.azure.storage.StorageException;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
|
import org.elasticsearch.action.support.GroupedActionListener;
|
||||||
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.DeleteResult;
|
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -37,18 +42,20 @@ import java.net.URISyntaxException;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
public class AzureBlobContainer extends AbstractBlobContainer {
|
public class AzureBlobContainer extends AbstractBlobContainer {
|
||||||
|
|
||||||
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
|
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
|
||||||
private final AzureBlobStore blobStore;
|
private final AzureBlobStore blobStore;
|
||||||
|
private final ThreadPool threadPool;
|
||||||
private final String keyPath;
|
private final String keyPath;
|
||||||
|
|
||||||
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
|
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
|
||||||
super(path);
|
super(path);
|
||||||
this.blobStore = blobStore;
|
this.blobStore = blobStore;
|
||||||
this.keyPath = path.buildAsString();
|
this.keyPath = path.buildAsString();
|
||||||
|
this.threadPool = threadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean blobExists(String blobName) {
|
private boolean blobExists(String blobName) {
|
||||||
|
@ -105,7 +112,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
||||||
@Override
|
@Override
|
||||||
public DeleteResult delete() throws IOException {
|
public DeleteResult delete() throws IOException {
|
||||||
try {
|
try {
|
||||||
return blobStore.deleteBlobDirectory(keyPath);
|
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
|
||||||
} catch (URISyntaxException | StorageException e) {
|
} catch (URISyntaxException | StorageException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
@ -113,9 +120,33 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
|
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
|
||||||
|
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
|
||||||
|
if (blobNames.isEmpty()) {
|
||||||
|
result.onResponse(null);
|
||||||
|
} else {
|
||||||
|
final GroupedActionListener<Void> listener =
|
||||||
|
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
|
||||||
|
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
|
||||||
|
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
|
||||||
|
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
|
||||||
|
for (String blobName : blobNames) {
|
||||||
|
executor.execute(ActionRunnable.run(listener, () -> {
|
||||||
|
logger.trace("deleteBlob({})", blobName);
|
||||||
try {
|
try {
|
||||||
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
|
blobStore.deleteBlob(buildKey(blobName));
|
||||||
} catch (URISyntaxException | StorageException e) {
|
} catch (StorageException e) {
|
||||||
|
if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
result.actionGet();
|
||||||
|
} catch (Exception e) {
|
||||||
throw new IOException("Exception during bulk delete", e);
|
throw new IOException("Exception during bulk delete", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,13 +28,14 @@ import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.DeleteResult;
|
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||||
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
|
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -43,15 +44,17 @@ import static java.util.Collections.emptyMap;
|
||||||
public class AzureBlobStore implements BlobStore {
|
public class AzureBlobStore implements BlobStore {
|
||||||
|
|
||||||
private final AzureStorageService service;
|
private final AzureStorageService service;
|
||||||
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
private final String clientName;
|
private final String clientName;
|
||||||
private final String container;
|
private final String container;
|
||||||
private final LocationMode locationMode;
|
private final LocationMode locationMode;
|
||||||
|
|
||||||
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
|
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
|
||||||
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
|
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
|
||||||
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
|
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
|
||||||
this.service = service;
|
this.service = service;
|
||||||
|
this.threadPool = threadPool;
|
||||||
// locationMode is set per repository, not per client
|
// locationMode is set per repository, not per client
|
||||||
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
|
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
|
||||||
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
|
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
|
||||||
|
@ -77,7 +80,7 @@ public class AzureBlobStore implements BlobStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlobContainer blobContainer(BlobPath path) {
|
public BlobContainer blobContainer(BlobPath path) {
|
||||||
return new AzureBlobContainer(path, this);
|
return new AzureBlobContainer(path, this, threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -88,12 +91,13 @@ public class AzureBlobStore implements BlobStore {
|
||||||
return service.blobExists(clientName, container, blob);
|
return service.blobExists(clientName, container, blob);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteBlobsIgnoringIfNotExists(Collection<String> blobs) throws URISyntaxException, StorageException {
|
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
|
||||||
service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs);
|
service.deleteBlob(clientName, container, blob);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException {
|
public DeleteResult deleteBlobDirectory(String path, Executor executor)
|
||||||
return service.deleteBlobDirectory(clientName, container, path);
|
throws URISyntaxException, StorageException, IOException {
|
||||||
|
return service.deleteBlobDirectory(clientName, container, path, executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
|
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
|
||||||
|
@ -107,7 +111,7 @@ public class AzureBlobStore implements BlobStore {
|
||||||
|
|
||||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
|
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
|
||||||
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
|
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
|
||||||
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
|
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class AzureRepository extends BlobStoreRepository {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected AzureBlobStore createBlobStore() {
|
protected AzureBlobStore createBlobStore() {
|
||||||
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
|
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
|
||||||
|
|
||||||
logger.debug(() -> new ParameterizedMessage(
|
logger.debug(() -> new ParameterizedMessage(
|
||||||
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
|
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
|
||||||
|
|
|
@ -23,12 +23,16 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.settings.SettingsException;
|
import org.elasticsearch.common.settings.SettingsException;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.ReloadablePlugin;
|
import org.elasticsearch.plugins.ReloadablePlugin;
|
||||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||||
|
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -39,6 +43,8 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
|
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
|
||||||
|
|
||||||
|
public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
|
||||||
|
|
||||||
// protected for testing
|
// protected for testing
|
||||||
final AzureStorageService azureStoreService;
|
final AzureStorageService azureStoreService;
|
||||||
|
|
||||||
|
@ -74,6 +80,15 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
||||||
|
return Collections.singletonList(executorBuilder());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ExecutorBuilder<?> executorBuilder() {
|
||||||
|
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reload(Settings settings) {
|
public void reload(Settings settings) {
|
||||||
// secure settings should be readable
|
// secure settings should be readable
|
||||||
|
|
|
@ -20,16 +20,13 @@
|
||||||
package org.elasticsearch.repositories.azure;
|
package org.elasticsearch.repositories.azure;
|
||||||
|
|
||||||
import com.microsoft.azure.storage.AccessCondition;
|
import com.microsoft.azure.storage.AccessCondition;
|
||||||
import com.microsoft.azure.storage.BatchException;
|
|
||||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||||
import com.microsoft.azure.storage.Constants;
|
|
||||||
import com.microsoft.azure.storage.OperationContext;
|
import com.microsoft.azure.storage.OperationContext;
|
||||||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||||
import com.microsoft.azure.storage.RetryPolicy;
|
import com.microsoft.azure.storage.RetryPolicy;
|
||||||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||||
import com.microsoft.azure.storage.StorageException;
|
import com.microsoft.azure.storage.StorageException;
|
||||||
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
|
|
||||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||||
|
@ -45,6 +42,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.DeleteResult;
|
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||||
|
@ -55,6 +53,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.settings.SettingsException;
|
import org.elasticsearch.common.settings.SettingsException;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -68,10 +67,9 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
@ -190,61 +188,72 @@ public class AzureStorageService {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection<String> blobs)
|
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
|
||||||
throws URISyntaxException, StorageException {
|
|
||||||
logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs));
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
// Container name must be lower case.
|
// Container name must be lower case.
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
final Iterator<String> blobIterator = blobs.iterator();
|
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
|
||||||
int currentBatchSize = 0;
|
SocketAccess.doPrivilegedVoidException(() -> {
|
||||||
while (blobIterator.hasNext()) {
|
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||||
final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation();
|
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
|
||||||
do {
|
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
||||||
batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()),
|
});
|
||||||
DeleteSnapshotsOption.NONE, null, null);
|
|
||||||
++currentBatchSize;
|
|
||||||
} while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS);
|
|
||||||
currentBatchSize = 0;
|
|
||||||
try {
|
|
||||||
SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp));
|
|
||||||
} catch (BatchException e) {
|
|
||||||
for (StorageException ex : e.getExceptions().values()) {
|
|
||||||
if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
|
|
||||||
logger.error("Batch exceptions [{}]", e.getExceptions());
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DeleteResult deleteBlobDirectory(String account, String container, String path)
|
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
|
||||||
throws URISyntaxException, StorageException, IOException {
|
throws URISyntaxException, StorageException, IOException {
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
final AtomicLong outstanding = new AtomicLong(1L);
|
||||||
|
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
|
||||||
final AtomicLong blobsDeleted = new AtomicLong();
|
final AtomicLong blobsDeleted = new AtomicLong();
|
||||||
final AtomicLong bytesDeleted = new AtomicLong();
|
final AtomicLong bytesDeleted = new AtomicLong();
|
||||||
final List<String> blobsToDelete = new ArrayList<>();
|
|
||||||
SocketAccess.doPrivilegedVoidException(() -> {
|
SocketAccess.doPrivilegedVoidException(() -> {
|
||||||
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
|
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
|
||||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||||
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
|
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
|
||||||
|
outstanding.incrementAndGet();
|
||||||
|
executor.execute(new AbstractRunnable() {
|
||||||
|
@Override
|
||||||
|
protected void doRun() throws Exception {
|
||||||
final long len;
|
final long len;
|
||||||
if (blobItem instanceof CloudBlob) {
|
if (blobItem instanceof CloudBlob) {
|
||||||
len = ((CloudBlob) blobItem).getProperties().getLength();
|
len = ((CloudBlob) blobItem).getProperties().getLength();
|
||||||
} else {
|
} else {
|
||||||
len = -1L;
|
len = -1L;
|
||||||
}
|
}
|
||||||
blobsToDelete.add(blobPath);
|
deleteBlob(account, container, blobPath);
|
||||||
blobsDeleted.incrementAndGet();
|
blobsDeleted.incrementAndGet();
|
||||||
if (len >= 0) {
|
if (len >= 0) {
|
||||||
bytesDeleted.addAndGet(len);
|
bytesDeleted.addAndGet(len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAfter() {
|
||||||
|
if (outstanding.decrementAndGet() == 0) {
|
||||||
|
result.onResponse(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete);
|
}
|
||||||
|
});
|
||||||
|
if (outstanding.decrementAndGet() == 0) {
|
||||||
|
result.onResponse(null);
|
||||||
|
}
|
||||||
|
result.actionGet();
|
||||||
|
if (exceptions.isEmpty() == false) {
|
||||||
|
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
|
||||||
|
exceptions.forEach(ex::addSuppressed);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
|
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,8 @@ import org.elasticsearch.mocksocket.MockHttpServer;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.RestUtils;
|
import org.elasticsearch.rest.RestUtils;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -62,6 +64,7 @@ import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
@ -88,9 +91,11 @@ import static org.hamcrest.Matchers.lessThan;
|
||||||
public class AzureBlobContainerRetriesTests extends ESTestCase {
|
public class AzureBlobContainerRetriesTests extends ESTestCase {
|
||||||
|
|
||||||
private HttpServer httpServer;
|
private HttpServer httpServer;
|
||||||
|
private ThreadPool threadPool;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
threadPool = new TestThreadPool(getTestClass().getName(), AzureRepositoryPlugin.executorBuilder());
|
||||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||||
httpServer.start();
|
httpServer.start();
|
||||||
super.setUp();
|
super.setUp();
|
||||||
|
@ -100,6 +105,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
httpServer.stop(0);
|
httpServer.stop(0);
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
|
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlobContainer createBlobContainer(final int maxRetries) {
|
private BlobContainer createBlobContainer(final int maxRetries) {
|
||||||
|
@ -139,7 +145,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
|
||||||
.put(ACCOUNT_SETTING.getKey(), clientName)
|
.put(ACCOUNT_SETTING.getKey(), clientName)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service));
|
return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service, threadPool), threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||||
return Collections.singletonMap("/", new AzureBlobStoreHttpHandler("container"));
|
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -30,7 +30,7 @@ public class AzureHttpFixture {
|
||||||
|
|
||||||
private AzureHttpFixture(final String address, final int port, final String container) throws IOException {
|
private AzureHttpFixture(final String address, final int port, final String container) throws IOException {
|
||||||
this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
|
this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
|
||||||
server.createContext("/", new AzureHttpHandler(container));
|
server.createContext("/" + container, new AzureHttpHandler(container));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void start() throws Exception {
|
private void start() throws Exception {
|
||||||
|
|
|
@ -22,21 +22,16 @@ import com.sun.net.httpserver.Headers;
|
||||||
import com.sun.net.httpserver.HttpExchange;
|
import com.sun.net.httpserver.HttpExchange;
|
||||||
import com.sun.net.httpserver.HttpHandler;
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.UUIDs;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.RestUtils;
|
import org.elasticsearch.rest.RestUtils;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.io.Writer;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -72,21 +67,18 @@ public class AzureHttpHandler implements HttpHandler {
|
||||||
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
|
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// Request body is closed in the finally block
|
|
||||||
final BytesReference requestBody = Streams.readFully(Streams.noCloseStream(exchange.getRequestBody()));
|
|
||||||
if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
|
if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
|
||||||
// Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
|
// Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
|
||||||
final Map<String, String> params = new HashMap<>();
|
final Map<String, String> params = new HashMap<>();
|
||||||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||||
|
|
||||||
final String blockId = params.get("blockid");
|
final String blockId = params.get("blockid");
|
||||||
blobs.put(blockId, requestBody);
|
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
|
||||||
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
||||||
|
|
||||||
} else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
|
} else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
|
||||||
// Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
|
// Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
|
||||||
final String blockList =
|
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
|
||||||
Streams.copyToString(new InputStreamReader(requestBody.streamInput(), StandardCharsets.UTF_8));
|
|
||||||
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
|
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
|
||||||
.filter(line -> line.contains("</Latest>"))
|
.filter(line -> line.contains("</Latest>"))
|
||||||
.map(line -> line.substring(0, line.indexOf("</Latest>")))
|
.map(line -> line.substring(0, line.indexOf("</Latest>")))
|
||||||
|
@ -105,12 +97,12 @@ public class AzureHttpHandler implements HttpHandler {
|
||||||
// PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
|
// PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
|
||||||
final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
|
final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
|
||||||
if ("*".equals(ifNoneMatch)) {
|
if ("*".equals(ifNoneMatch)) {
|
||||||
if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), requestBody) != null) {
|
if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) {
|
||||||
sendError(exchange, RestStatus.CONFLICT);
|
sendError(exchange, RestStatus.CONFLICT);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
blobs.put(exchange.getRequestURI().getPath(), requestBody);
|
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
|
||||||
}
|
}
|
||||||
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
||||||
|
|
||||||
|
@ -198,45 +190,6 @@ public class AzureHttpHandler implements HttpHandler {
|
||||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||||
exchange.getResponseBody().write(response);
|
exchange.getResponseBody().write(response);
|
||||||
|
|
||||||
} else if (Regex.simpleMatch("POST /?comp=batch", request)) {
|
|
||||||
// Batch Delete (https://docs.microsoft.com/en-us/rest/api/storageservices/blob-batch)
|
|
||||||
try (BufferedReader reader = new BufferedReader(new InputStreamReader(requestBody.streamInput()))) {
|
|
||||||
final Set<String> toDelete = reader.lines().filter(l -> l.startsWith("DELETE"))
|
|
||||||
.map(l -> l.split(" ")[1]).collect(Collectors.toSet());
|
|
||||||
final BytesStreamOutput baos = new BytesStreamOutput();
|
|
||||||
final String batchSeparator = "batchresponse_" + UUIDs.randomBase64UUID();
|
|
||||||
try (Writer writer = new OutputStreamWriter(baos)) {
|
|
||||||
int contentId = 0;
|
|
||||||
for (String b : toDelete) {
|
|
||||||
writer.write("\r\n--" + batchSeparator + "\r\n" +
|
|
||||||
"Content-Type: application/http \r\n" +
|
|
||||||
"Content-ID: " + contentId++ + " \r\n");
|
|
||||||
if (blobs.remove(b) == null) {
|
|
||||||
writer.write("\r\nHTTP/1.1 404 The specified blob does not exist. \r\n" +
|
|
||||||
"x-ms-error-code: BlobNotFound \r\n" +
|
|
||||||
"x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" +
|
|
||||||
"x-ms-version: 2018-11-09\r\n" +
|
|
||||||
"Content-Length: 216 \r\n" +
|
|
||||||
"Content-Type: application/xml\r\n\r\n" +
|
|
||||||
"<?xml version=\"1.0\" encoding=\"utf-8\"?> \r\n" +
|
|
||||||
"<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\r\n" +
|
|
||||||
"RequestId:" + UUIDs.randomBase64UUID() + "\r\n" +
|
|
||||||
"Time:2020-01-01T01:01:01.0000000Z</Message></Error>\r\n");
|
|
||||||
} else {
|
|
||||||
writer.write(
|
|
||||||
"\r\nHTTP/1.1 202 Accepted \r\n" +
|
|
||||||
"x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" +
|
|
||||||
"x-ms-version: 2018-11-09\r\n\r\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writer.write("--" + batchSeparator + "--");
|
|
||||||
}
|
|
||||||
final Headers headers = exchange.getResponseHeaders();
|
|
||||||
headers.add("Content-Type",
|
|
||||||
"multipart/mixed; boundary=" + batchSeparator);
|
|
||||||
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), baos.size());
|
|
||||||
baos.bytes().writeTo(exchange.getResponseBody());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
sendError(exchange, RestStatus.BAD_REQUEST);
|
sendError(exchange, RestStatus.BAD_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue