Move azure client logic from AzureStorageService to AzureBlobStore (#56806)
Backport of #56782
This commit is contained in:
parent
27a090232e
commit
1530bff0cb
|
@ -19,8 +19,25 @@
|
|||
|
||||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.LocationMode;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
|
@ -28,22 +45,39 @@ import org.elasticsearch.common.blobstore.BlobMetadata;
|
|||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
public class AzureBlobStore implements BlobStore {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
|
||||
|
||||
private final AzureStorageService service;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
|
@ -89,34 +123,190 @@ public class AzureBlobStore implements BlobStore {
|
|||
}
|
||||
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
return service.blobExists(clientName, container, blob);
|
||||
// Container name must be lower case.
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
return SocketAccess.doPrivilegedException(() -> {
|
||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||
return azureBlob.exists(null, null, client.v2().get());
|
||||
});
|
||||
}
|
||||
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
service.deleteBlob(clientName, container, blob);
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
// Container name must be lower case.
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
|
||||
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
||||
});
|
||||
}
|
||||
|
||||
public DeleteResult deleteBlobDirectory(String path, Executor executor)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
return service.deleteBlobDirectory(clientName, container, path, executor);
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
|
||||
final AtomicLong outstanding = new AtomicLong(1L);
|
||||
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
|
||||
final AtomicLong blobsDeleted = new AtomicLong();
|
||||
final AtomicLong bytesDeleted = new AtomicLong();
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
|
||||
outstanding.incrementAndGet();
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final long len;
|
||||
if (blobItem instanceof CloudBlob) {
|
||||
len = ((CloudBlob) blobItem).getProperties().getLength();
|
||||
} else {
|
||||
len = -1L;
|
||||
}
|
||||
deleteBlob(blobPath);
|
||||
blobsDeleted.incrementAndGet();
|
||||
if (len >= 0) {
|
||||
bytesDeleted.addAndGet(len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
result.actionGet();
|
||||
if (exceptions.isEmpty() == false) {
|
||||
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
|
||||
exceptions.forEach(ex::addSuppressed);
|
||||
throw ex;
|
||||
}
|
||||
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
|
||||
}
|
||||
|
||||
public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, StorageException {
|
||||
return service.getInputStream(clientName, container, blob, position, length);
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
|
||||
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
|
||||
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
|
||||
blockBlobReference.openInputStream(position, length, null, null, client.v2().get()));
|
||||
return giveSocketPermissionsToStream(is);
|
||||
}
|
||||
|
||||
public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
|
||||
// NOTE: this should be here: if (prefix == null) prefix = "";
|
||||
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
|
||||
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
|
||||
final Map<String, BlobMetadata> blobsBuilder = new HashMap<String, BlobMetadata>();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
|
||||
enumBlobListingDetails, null, client.v2().get())) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
|
||||
if (blobItem instanceof CloudBlob) {
|
||||
final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
|
||||
final String name = blobPath.substring(keyPath.length());
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
|
||||
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getLength()));
|
||||
}
|
||||
}
|
||||
});
|
||||
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
|
||||
}
|
||||
|
||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
|
||||
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
|
||||
final Set<String> blobsBuilder = new HashSet<String>();
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final String keyPath = path.buildAsString();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
|
||||
if (blobItem instanceof CloudBlobDirectory) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
|
||||
// Lastly, we add the length of keyPath to the offset to strip this container's path.
|
||||
final String uriPath = uri.getPath();
|
||||
blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return Collections.unmodifiableMap(blobsBuilder.stream().collect(
|
||||
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
|
||||
}
|
||||
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
|
||||
assert inputStream.markSupported()
|
||||
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||
try {
|
||||
final AccessCondition accessCondition =
|
||||
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
|
||||
SocketAccess.doPrivilegedVoidException(() ->
|
||||
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), client.v2().get()));
|
||||
} catch (final StorageException se) {
|
||||
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
||||
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
||||
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
|
||||
}
|
||||
throw se;
|
||||
}
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
|
||||
}
|
||||
|
||||
private Tuple<CloudBlobClient, Supplier<OperationContext>> client() {
|
||||
return service.client(clientName);
|
||||
}
|
||||
|
||||
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
|
||||
return new InputStream() {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(stream::read);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(() -> stream.read(b));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(() -> stream.read(b, off, len));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,67 +19,30 @@
|
|||
|
||||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||
import com.microsoft.azure.storage.RetryPolicy;
|
||||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobMetadata;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
public class AzureStorageService {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(AzureStorageService.class);
|
||||
|
||||
public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
|
||||
/**
|
||||
* {@link com.microsoft.azure.storage.blob.BlobConstants#MAX_SINGLE_UPLOAD_BLOB_SIZE_IN_BYTES}
|
||||
|
@ -179,191 +142,8 @@ public class AzureStorageService {
|
|||
return splits[2];
|
||||
}
|
||||
|
||||
public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException {
|
||||
// Container name must be lower case.
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
return SocketAccess.doPrivilegedException(() -> {
|
||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||
return azureBlob.exists(null, null, client.v2().get());
|
||||
});
|
||||
}
|
||||
|
||||
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
// Container name must be lower case.
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
|
||||
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
||||
});
|
||||
}
|
||||
|
||||
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
|
||||
final AtomicLong outstanding = new AtomicLong(1L);
|
||||
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
|
||||
final AtomicLong blobsDeleted = new AtomicLong();
|
||||
final AtomicLong bytesDeleted = new AtomicLong();
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
|
||||
outstanding.incrementAndGet();
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final long len;
|
||||
if (blobItem instanceof CloudBlob) {
|
||||
len = ((CloudBlob) blobItem).getProperties().getLength();
|
||||
} else {
|
||||
len = -1L;
|
||||
}
|
||||
deleteBlob(account, container, blobPath);
|
||||
blobsDeleted.incrementAndGet();
|
||||
if (len >= 0) {
|
||||
bytesDeleted.addAndGet(len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
result.actionGet();
|
||||
if (exceptions.isEmpty() == false) {
|
||||
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
|
||||
exceptions.forEach(ex::addSuppressed);
|
||||
throw ex;
|
||||
}
|
||||
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
|
||||
}
|
||||
|
||||
public InputStream getInputStream(String account, String container, String blob, long position, @Nullable Long length)
|
||||
throws URISyntaxException, StorageException {
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
|
||||
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
|
||||
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
|
||||
blockBlobReference.openInputStream(position, length, null, null, client.v2().get()));
|
||||
return giveSocketPermissionsToStream(is);
|
||||
}
|
||||
|
||||
public Map<String, BlobMetadata> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
// NOTE: this should be here: if (prefix == null) prefix = "";
|
||||
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
|
||||
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
|
||||
final MapBuilder<String, BlobMetadata> blobsBuilder = MapBuilder.newMapBuilder();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
|
||||
enumBlobListingDetails, null, client.v2().get())) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
|
||||
if (blobItem instanceof CloudBlob) {
|
||||
final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
|
||||
final String name = blobPath.substring(keyPath.length());
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
|
||||
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getLength()));
|
||||
}
|
||||
}
|
||||
});
|
||||
return blobsBuilder.immutableMap();
|
||||
}
|
||||
|
||||
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException {
|
||||
final Set<String> blobsBuilder = new HashSet<>();
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final String keyPath = path.buildAsString();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
|
||||
if (blobItem instanceof CloudBlobDirectory) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
|
||||
// Lastly, we add the length of keyPath to the offset to strip this container's path.
|
||||
final String uriPath = uri.getPath();
|
||||
blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
|
||||
}
|
||||
}
|
||||
});
|
||||
return Collections.unmodifiableSet(blobsBuilder);
|
||||
}
|
||||
|
||||
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
|
||||
boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException {
|
||||
assert inputStream.markSupported()
|
||||
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||
try {
|
||||
final AccessCondition accessCondition =
|
||||
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
|
||||
SocketAccess.doPrivilegedVoidException(() ->
|
||||
blob.upload(inputStream, blobSize, accessCondition, getBlobRequestOptionsForWriteBlob(), client.v2().get()));
|
||||
} catch (final StorageException se) {
|
||||
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
||||
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
||||
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
|
||||
}
|
||||
throw se;
|
||||
}
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
|
||||
}
|
||||
|
||||
// package private for testing
|
||||
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
|
||||
return null;
|
||||
}
|
||||
|
||||
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
|
||||
return new InputStream() {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(stream::read);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(() -> stream.read(b));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(() -> stream.read(b, off, len));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue