[plugin] repository-azure is not working properly hangs on basic operations (#1740) (#1749)

This commit fixes repository-azure hanging on basic operations. This will be reverted 
once it's fixed upstream in the Azure library.

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
Andriy Redko 2021-12-16 16:10:52 -05:00 committed by GitHub
parent ef44182731
commit 20692a2ff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 59 deletions

View File

@ -283,4 +283,23 @@ task azureThirdPartyTest(type: Test) {
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }" nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
} }
} }
check.dependsOn(azureThirdPartyTest)
task azureThirdPartyDefaultXmlTest(type: Test) {
SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class);
SourceSet internalTestSourceSet = sourceSets.getByName(InternalClusterTestPlugin.SOURCE_SET_NAME)
setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs())
setClasspath(internalTestSourceSet.getRuntimeClasspath())
dependsOn tasks.internalClusterTest
include '**/AzureStorageCleanupThirdPartyTests.class'
systemProperty 'javax.xml.stream.XMLInputFactory', "com.sun.xml.internal.stream.XMLInputFactoryImpl"
systemProperty 'test.azure.account', azureAccount ? azureAccount : ""
systemProperty 'test.azure.key', azureKey ? azureKey : ""
systemProperty 'test.azure.sas_token', azureSasToken ? azureSasToken : ""
systemProperty 'test.azure.container', azureContainer ? azureContainer : ""
systemProperty 'test.azure.base', (azureBasePath ? azureBasePath : "") + "_third_party_tests_" + BuildParams.testSeed
if (useFixture) {
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
}
}
check.dependsOn(azureThirdPartyTest, azureThirdPartyDefaultXmlTest)

View File

@ -35,6 +35,7 @@ package org.opensearch.repositories.azure;
import com.azure.core.http.HttpMethod; import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest; import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse; import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response; import com.azure.core.http.rest.Response;
import com.azure.core.util.Context; import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobClient;
@ -51,6 +52,7 @@ import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions; import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.common.implementation.Constants; import com.azure.storage.common.implementation.Constants;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables; import org.apache.logging.log4j.core.util.Throwables;
@ -82,6 +84,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -217,50 +220,71 @@ public class AzureBlobStore implements BlobStore {
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path); final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);
SocketAccess.doPrivilegedVoidException(() -> { SocketAccess.doPrivilegedVoidException(() -> {
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) { String continuationToken = null;
// Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
outstanding.incrementAndGet(); do {
executor.execute(new AbstractRunnable() { // Fetch one page at a time, others are going to be fetched by continuation token
@Override // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
protected void doRun() throws Exception { // gets addressed.
final long len = blobItem.getProperties().getContentLength(); final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName()); if (!pageOpt.isPresent()) {
logger.trace( // No more pages, should never happen
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName()) break;
); }
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(
() -> new ParameterizedMessage(
"container [{}]: blob [{}] deleted status [{}].",
container,
blobItem.getName(),
response.getStatusCode()
)
);
blobsDeleted.incrementAndGet(); final PagedResponse<BlobItem> page = pageOpt.get();
if (len >= 0) { for (final BlobItem blobItem : page.getValue()) {
bytesDeleted.addAndGet(len); // Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len = blobItem.getProperties().getContentLength();
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
logger.trace(
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
);
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(
() -> new ParameterizedMessage(
"container [{}]: blob [{}] deleted status [{}].",
container,
blobItem.getName(),
response.getStatusCode()
)
);
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
} }
}
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
exceptions.add(e); exceptions.add(e);
}
@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
} }
}
}); @Override
} public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
}
});
}
// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
}); });
if (outstanding.decrementAndGet() == 0) { if (outstanding.decrementAndGet() == 0) {
result.onResponse(null); result.onResponse(null);
} }
@ -301,20 +325,39 @@ public class AzureBlobStore implements BlobStore {
.setPrefix(keyPath + (prefix == null ? "" : prefix)); .setPrefix(keyPath + (prefix == null ? "" : prefix));
SocketAccess.doPrivilegedVoidException(() -> { SocketAccess.doPrivilegedVoidException(() -> {
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) { String continuationToken = null;
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { do {
continue; // Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();
if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
} }
final String name = getBlobName(blobItem.getName(), container, keyPath); final PagedResponse<BlobItem> page = pageOpt.get();
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name)); for (final BlobItem blobItem : page.getValue()) {
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
continue;
}
final BlobItemProperties properties = blobItem.getProperties(); final String name = getBlobName(blobItem.getName(), container, keyPath);
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength())); logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}
final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}
// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
}); });
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap(); return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
@ -330,18 +373,36 @@ public class AzureBlobStore implements BlobStore {
.setPrefix(keyPath); .setPrefix(keyPath);
SocketAccess.doPrivilegedVoidException(() -> { SocketAccess.doPrivilegedVoidException(() -> {
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) { String continuationToken = null;
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { do {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/ // Fetch one page at a time, others are going to be fetched by continuation token
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /. // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// Lastly, we add the length of keyPath to the offset to strip this container's path. // gets addressed
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", ""); final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name)); .streamByPage(continuationToken)
blobsBuilder.add(name); .findFirst();
if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
} }
}
; final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.add(name);
}
}
// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
}); });
return Collections.unmodifiableMap( return Collections.unmodifiableMap(

View File

@ -208,6 +208,7 @@ public class AzureHttpHandler implements HttpHandler {
} }
list.append("</Blobs>"); list.append("</Blobs>");
list.append("<NextMarker />");
list.append("</EnumerationResults>"); list.append("</EnumerationResults>");
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8); byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);