mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-10 23:15:04 +00:00
repository-azure: revert the fix for https://github.com/opensearch-project/OpenSearch/issues/1734 once upstream solution is available (#2446)
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
parent
b619a050bf
commit
12dd5d76b5
@ -44,9 +44,9 @@ opensearchplugin {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api 'com.azure:azure-core:1.22.0'
|
||||
api 'com.azure:azure-storage-common:12.14.3'
|
||||
api 'com.azure:azure-core-http-netty:1.11.7'
|
||||
api 'com.azure:azure-core:1.26.0'
|
||||
api 'com.azure:azure-storage-common:12.15.0'
|
||||
api 'com.azure:azure-core-http-netty:1.11.8'
|
||||
api "io.netty:netty-codec-dns:${versions.netty}"
|
||||
api "io.netty:netty-codec-socks:${versions.netty}"
|
||||
api "io.netty:netty-codec-http2:${versions.netty}"
|
||||
@ -54,12 +54,12 @@ dependencies {
|
||||
api "io.netty:netty-resolver-dns:${versions.netty}"
|
||||
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
|
||||
implementation project(':modules:transport-netty4')
|
||||
api 'com.azure:azure-storage-blob:12.14.1'
|
||||
api 'com.azure:azure-storage-blob:12.14.4'
|
||||
api 'org.reactivestreams:reactive-streams:1.0.3'
|
||||
api 'io.projectreactor:reactor-core:3.4.15'
|
||||
api 'io.projectreactor.netty:reactor-netty:1.0.13'
|
||||
api 'io.projectreactor.netty:reactor-netty-core:1.0.13'
|
||||
api 'io.projectreactor.netty:reactor-netty-http:1.0.13'
|
||||
api 'io.projectreactor.netty:reactor-netty:1.0.16'
|
||||
api 'io.projectreactor.netty:reactor-netty-core:1.0.16'
|
||||
api 'io.projectreactor.netty:reactor-netty-http:1.0.16'
|
||||
api "org.slf4j:slf4j-api:${versions.slf4j}"
|
||||
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
|
||||
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
|
||||
|
@ -1 +0,0 @@
|
||||
194b21b804c20c85f7d2a6199280075f6747e188
|
@ -0,0 +1 @@
|
||||
461b89dcf8948a0c4a97d4f1d876f778d0cac7aa
|
@ -1 +0,0 @@
|
||||
c6b14fcca3e75acc8dbe07ac101afd05d48a1647
|
@ -0,0 +1 @@
|
||||
0ea66d4531fb41cb3b5ab55e2e7b7f301e7f8503
|
@ -1 +0,0 @@
|
||||
384763aef32d779ee22ef3faa03049fee7e0f6de
|
@ -0,0 +1 @@
|
||||
2b92020693d09e4980b96d278e8038a1087afea0
|
@ -1 +0,0 @@
|
||||
e8d6258aa8bf1594980c01294e60de74d13a815f
|
@ -0,0 +1 @@
|
||||
4d63ce8bbd20379c5e5262b1204ceac7b31a7743
|
@ -1 +0,0 @@
|
||||
cf216a9ba6b50210664761add9db744c9c3f51d8
|
@ -0,0 +1 @@
|
||||
d90829f6127966b0c35c4a3e8e23ca9ed29cd8a5
|
@ -1 +0,0 @@
|
||||
a67949c5946dd66c7ab0a3b059213c23345c32b1
|
@ -0,0 +1 @@
|
||||
8f842a912677f2bc614ff60fb9e786d4fa429c34
|
@ -1 +0,0 @@
|
||||
de7a38101098db9438c18fdd09acc5b79a2ec02a
|
@ -0,0 +1 @@
|
||||
93edb9a1dc774d843551a616e0f316e11ffa81ed
|
@ -35,7 +35,6 @@ package org.opensearch.repositories.azure;
|
||||
import com.azure.core.http.HttpMethod;
|
||||
import com.azure.core.http.HttpRequest;
|
||||
import com.azure.core.http.HttpResponse;
|
||||
import com.azure.core.http.rest.PagedResponse;
|
||||
import com.azure.core.http.rest.Response;
|
||||
import com.azure.core.util.Context;
|
||||
import com.azure.storage.blob.BlobClient;
|
||||
@ -52,7 +51,6 @@ import com.azure.storage.blob.models.ListBlobsOptions;
|
||||
import com.azure.storage.blob.options.BlobParallelUploadOptions;
|
||||
import com.azure.storage.common.implementation.Constants;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.core.util.Throwables;
|
||||
@ -84,7 +82,6 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@ -220,71 +217,50 @@ public class AzureBlobStore implements BlobStore {
|
||||
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
String continuationToken = null;
|
||||
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) {
|
||||
// Skipping prefixes as those are not deletable and should not be there
|
||||
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
|
||||
|
||||
do {
|
||||
// Fetch one page at a time, others are going to be fetched by continuation token
|
||||
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
|
||||
// gets addressed.
|
||||
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout())
|
||||
.streamByPage(continuationToken)
|
||||
.findFirst();
|
||||
outstanding.incrementAndGet();
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final long len = blobItem.getProperties().getContentLength();
|
||||
|
||||
if (!pageOpt.isPresent()) {
|
||||
// No more pages, should never happen
|
||||
break;
|
||||
}
|
||||
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
|
||||
logger.trace(
|
||||
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
|
||||
);
|
||||
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
|
||||
logger.trace(
|
||||
() -> new ParameterizedMessage(
|
||||
"container [{}]: blob [{}] deleted status [{}].",
|
||||
container,
|
||||
blobItem.getName(),
|
||||
response.getStatusCode()
|
||||
)
|
||||
);
|
||||
|
||||
final PagedResponse<BlobItem> page = pageOpt.get();
|
||||
for (final BlobItem blobItem : page.getValue()) {
|
||||
// Skipping prefixes as those are not deletable and should not be there
|
||||
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
|
||||
|
||||
outstanding.incrementAndGet();
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final long len = blobItem.getProperties().getContentLength();
|
||||
|
||||
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
|
||||
logger.trace(
|
||||
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
|
||||
);
|
||||
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
|
||||
logger.trace(
|
||||
() -> new ParameterizedMessage(
|
||||
"container [{}]: blob [{}] deleted status [{}].",
|
||||
container,
|
||||
blobItem.getName(),
|
||||
response.getStatusCode()
|
||||
)
|
||||
);
|
||||
|
||||
blobsDeleted.incrementAndGet();
|
||||
if (len >= 0) {
|
||||
bytesDeleted.addAndGet(len);
|
||||
}
|
||||
blobsDeleted.incrementAndGet();
|
||||
if (len >= 0) {
|
||||
bytesDeleted.addAndGet(len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptions.add(e);
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Fetch next continuation token
|
||||
continuationToken = page.getContinuationToken();
|
||||
} while (StringUtils.isNotBlank(continuationToken));
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
if (outstanding.decrementAndGet() == 0) {
|
||||
result.onResponse(null);
|
||||
}
|
||||
@ -325,39 +301,19 @@ public class AzureBlobStore implements BlobStore {
|
||||
.setPrefix(keyPath + (prefix == null ? "" : prefix));
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
String continuationToken = null;
|
||||
|
||||
do {
|
||||
// Fetch one page at a time, others are going to be fetched by continuation token
|
||||
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
|
||||
// gets addressed
|
||||
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
|
||||
.streamByPage(continuationToken)
|
||||
.findFirst();
|
||||
|
||||
if (!pageOpt.isPresent()) {
|
||||
// No more pages, should never happen
|
||||
break;
|
||||
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
|
||||
// Skipping over the prefixes, only look for the blobs
|
||||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final PagedResponse<BlobItem> page = pageOpt.get();
|
||||
for (final BlobItem blobItem : page.getValue()) {
|
||||
// Skipping over the prefixes, only look for the blobs
|
||||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
|
||||
continue;
|
||||
}
|
||||
final String name = getBlobName(blobItem.getName(), container, keyPath);
|
||||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
|
||||
|
||||
final String name = getBlobName(blobItem.getName(), container, keyPath);
|
||||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
|
||||
|
||||
final BlobItemProperties properties = blobItem.getProperties();
|
||||
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
|
||||
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
|
||||
}
|
||||
|
||||
// Fetch next continuation token
|
||||
continuationToken = page.getContinuationToken();
|
||||
} while (StringUtils.isNotBlank(continuationToken));
|
||||
final BlobItemProperties properties = blobItem.getProperties();
|
||||
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
|
||||
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
|
||||
}
|
||||
});
|
||||
|
||||
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
|
||||
@ -373,36 +329,17 @@ public class AzureBlobStore implements BlobStore {
|
||||
.setPrefix(keyPath);
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
String continuationToken = null;
|
||||
|
||||
do {
|
||||
// Fetch one page at a time, others are going to be fetched by continuation token
|
||||
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
|
||||
// gets addressed
|
||||
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
|
||||
.streamByPage(continuationToken)
|
||||
.findFirst();
|
||||
|
||||
if (!pageOpt.isPresent()) {
|
||||
// No more pages, should never happen
|
||||
break;
|
||||
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
|
||||
// Skipping over the blobs, only look for prefixes
|
||||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
|
||||
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
|
||||
// Lastly, we add the length of keyPath to the offset to strip this container's path.
|
||||
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
|
||||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
|
||||
blobsBuilder.add(name);
|
||||
}
|
||||
|
||||
final PagedResponse<BlobItem> page = pageOpt.get();
|
||||
for (final BlobItem blobItem : page.getValue()) {
|
||||
// Skipping over the blobs, only look for prefixes
|
||||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
|
||||
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
|
||||
// Lastly, we add the length of keyPath to the offset to strip this container's path.
|
||||
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
|
||||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
|
||||
blobsBuilder.add(name);
|
||||
}
|
||||
}
|
||||
// Fetch next continuation token
|
||||
continuationToken = page.getContinuationToken();
|
||||
} while (StringUtils.isNotBlank(continuationToken));
|
||||
}
|
||||
});
|
||||
|
||||
return Collections.unmodifiableMap(
|
||||
|
Loading…
x
Reference in New Issue
Block a user