Enable Parallel Deletes in Azure Repository (#42783) (#43886)

* Parallel deletes via private thread pool
This commit is contained in:
Armin Braun 2019-07-03 09:28:39 +02:00 committed by GitHub
parent 365dfe88ca
commit 826f38cd70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 101 additions and 14 deletions

View File

@ -80,6 +80,8 @@ testClusters.integTest {
// in a hacky way to change the protocol and endpoint. We must fix that.
setting 'azure.client.integration_test.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${azureStorageFixture.addressAndPort }" }
String firstPartOfSeed = project.rootProject.testSeed.tokenize(':').get(0)
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString()
} else {
println "Using an external service to test the repository-azure plugin"
}

View File

@ -23,29 +23,37 @@ import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class AzureBlobContainer extends AbstractBlobContainer {
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
private final AzureBlobStore blobStore;
private final ThreadPool threadPool;
private final String keyPath;
public AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
super(path);
this.blobStore = blobStore;
this.keyPath = path.buildAsString();
this.threadPool = threadPool;
}
@Override
@ -117,6 +125,32 @@ public class AzureBlobContainer extends AbstractBlobContainer {
}
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e);
}
}
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
logger.trace("listBlobsByPrefix({})", prefix);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.InputStream;
@ -40,15 +41,17 @@ import static java.util.Collections.emptyMap;
public class AzureBlobStore implements BlobStore {
private final AzureStorageService service;
private final ThreadPool threadPool;
private final String clientName;
private final String container;
private final LocationMode locationMode;
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
this.threadPool = threadPool;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
@ -70,7 +73,7 @@ public class AzureBlobStore implements BlobStore {
@Override
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this);
return new AzureBlobContainer(path, this, threadPool);
}
@Override

View File

@ -75,7 +75,6 @@ public class AzureRepository extends BlobStoreRepository {
private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final Environment environment;
private final AzureStorageService storageService;
private final boolean readonly;
@ -83,7 +82,6 @@ public class AzureRepository extends BlobStoreRepository {
AzureStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.environment = environment;
this.storageService = storageService;
final String basePath = Strings.trimLeadingCharacter(Repository.BASE_PATH_SETTING.get(metadata.settings()), '/');
@ -115,7 +113,7 @@ public class AzureRepository extends BlobStoreRepository {
@Override
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

View File

@ -22,12 +22,15 @@ package org.elasticsearch.repositories.azure;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
@ -40,6 +43,8 @@ import java.util.Map;
*/
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
// protected for testing
final AzureStorageService azureStoreService;
@ -70,6 +75,15 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
);
}
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections.singletonList(executorBuilder());
}
public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
}
@Override
public void reload(Settings settings) {
// secure settings should be readable

View File

@ -23,13 +23,31 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
private ThreadPool threadPool;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}
@Override
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
return new AzureBlobStore(repositoryMetaData, client, threadPool);
}
}

View File

@ -22,13 +22,31 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
public class AzureBlobStoreTests extends ESBlobStoreTestCase {
private ThreadPool threadPool;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}
@Override
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
return new AzureBlobStore(repositoryMetaData, client, threadPool);
}
}

View File

@ -166,7 +166,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
protected final NamedXContentRegistry namedXContentRegistry;
private final ThreadPool threadPool;
protected final ThreadPool threadPool;
private static final int BUFFER_SIZE = 4096;

View File

@ -24,12 +24,12 @@ import org.elasticsearch.node.Node;
public class TestThreadPool extends ThreadPool {
public TestThreadPool(String name) {
this(name, Settings.EMPTY);
public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
this(name, Settings.EMPTY, customBuilders);
}
public TestThreadPool(String name, Settings settings) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build());
public TestThreadPool(String name, Settings settings, ExecutorBuilder<?>... customBuilders) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
}
}