Add a OverlordHelper that cleans up durable storage objects in MSQ (#13269)

* scratch

* s3 ls fix, add docs

* add documentation, update method name

* Add tests, address commits, change default value of the helper

* fix test

* update the default value of config, remove initial delay config

* Trigger Build

* update class

* add more tests

* docs update

* spellcheck

* remove ioe from the signature

* add back dmmy constructor for initialization

* fix guice bindings, intellij inspections
This commit is contained in:
Laksh Singla 2022-11-09 17:23:35 +05:30 committed by GitHub
parent d242a9314b
commit b7a513fe09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 425 additions and 12 deletions

View File

@ -24,6 +24,7 @@ import org.apache.druid.guice.annotations.UnstableApi;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
* Low level interface for interacting with different storage providers like S3, GCS, Azure and local file system.
@ -104,4 +105,13 @@ public interface StorageConnector
* @throws IOException
*/
void deleteRecursively(String path) throws IOException;
/**
* Returns a list containing all the files present in the path. The returned filenames should be such that joining
* the dirName and the file name form the full path that can be used as the arguments for other methods of the storage
* connector.
* For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the path
* "parent1/parent2" should be "child" and for "parent1" should be "parent2"
*/
List<String> listDir(String dirName);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.storage.local;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.StorageConnector;
@ -29,6 +30,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Implementation that uses local filesystem. All paths are appended with the base path, in such a way that it is not visible
@ -107,6 +111,23 @@ public class LocalFileStorageConnector implements StorageConnector
FileUtils.deleteDirectory(fileWithBasePath(dirName));
}
@Override
public List<String> listDir(String dirName)
{
File directory = fileWithBasePath(dirName);
if (!directory.exists()) {
throw new IAE("No directory exists on path [%s]", dirName);
}
if (!directory.isDirectory()) {
throw new IAE("Cannot list contents of [%s] since it is not a directory", dirName);
}
File[] files = directory.listFiles();
if (files == null) {
throw new ISE("Unable to fetch the file list from the path [%s]", dirName);
}
return Arrays.stream(files).map(File::getName).collect(Collectors.toList());
}
public File getBasePath()
{
return basePath;

View File

@ -19,6 +19,8 @@
package org.apache.druid.storage.local;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
@ -33,6 +35,9 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
public class LocalFileStorageConnectorTest
@ -109,6 +114,43 @@ public class LocalFileStorageConnectorTest
storageConnectorProvider.get();
}
@Test
public void listFilesTest() throws Exception
{
String topLevelDir = "top" + UUID.randomUUID();
String uuid_base = topLevelDir + "/" + UUID.randomUUID();
String uuid1 = uuid_base + "/" + UUID.randomUUID();
String uuid2 = uuid_base + "/" + UUID.randomUUID();
createAndPopulateFile(uuid1);
createAndPopulateFile(uuid2);
List<String> topLevelDirContents = storageConnector.listDir(topLevelDir);
List<String> expectedTopLevelDirContents = ImmutableList.of(new File(uuid_base).getName());
Assert.assertEquals(expectedTopLevelDirContents, topLevelDirContents);
// Converted to a set since the output of the listDir can be shuffled
Set<String> nextLevelDirContents = new HashSet<>(storageConnector.listDir(uuid_base));
Set<String> expectedNextLevelDirContents = ImmutableSet.of(new File(uuid1).getName(), new File(uuid2).getName());
Assert.assertEquals(expectedNextLevelDirContents, nextLevelDirContents);
// Check if listDir throws if an unknown path is passed as an argument
Assert.assertThrows(
IAE.class,
() -> {
storageConnector.listDir("unknown_top_path");
}
);
// Check if listDir throws if a file path is passed as an argument
Assert.assertThrows(
IAE.class,
() -> {
storageConnector.listDir(uuid1);
}
);
}
private void checkContents(String uuid) throws IOException
{
try (InputStream inputStream = storageConnector.read(uuid)) {

View File

@ -204,6 +204,21 @@ The following table lists the context parameters for the MSQ task engine:
| rowsPerSegment | INSERT or REPLACE<br /><br />The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
| indexSpec | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
## Durable Storage
This section enumerates the advantages and performance implications of enabling durable storage while executing MSQ tasks.
To prevent durable storage from getting filled up with temporary files in case the tasks fail to clean them up, a periodic
cleaner can be scheduled to clean the directories corresponding to which there isn't a controller task running. It utilizes
the storage connector to work upon the durable storage. The durable storage location should only be utilized to store the output
for cluster's MSQ tasks. If the location contains other files or directories, then they will get cleaned up as well.
Following table lists the properties that can be set to control the behavior of the durable storage of the cluster.
|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.enable` | true | Whether to enable durable storage for the cluster |
|`druid.msq.intermediate.storage.cleaner.enabled`| false | Whether durable storage cleaner should be enabled for the cluster. This should be set on the overlord|
|`druid.msq.intermediate.storage.cleaner.delaySeconds`| 86400 | The delay (in seconds) after the last run post which the durable storage cleaner would clean the outputs. This should be set on the overlord |
## Limits
Knowing the limits for the MSQ task engine can help you troubleshoot any [errors](#error-codes) that you encounter. Many of the errors occur as a result of reaching a limit.

View File

@ -24,9 +24,13 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.msq.indexing.DurableStorageCleaner;
import org.apache.druid.msq.indexing.DurableStorageCleanerConfig;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
@ -44,18 +48,8 @@ public class MSQDurableStorageModule implements DruidModule
public static final String MSQ_INTERMEDIATE_STORAGE_ENABLED =
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "enable");
@Inject
private Properties properties;
public MSQDurableStorageModule()
{
}
public MSQDurableStorageModule(Properties properties)
{
this.properties = properties;
}
@Override
public List<? extends Module> getJacksonModules()
{
@ -76,9 +70,25 @@ public class MSQDurableStorageModule implements DruidModule
binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
.toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))
.in(LazySingleton.class);
Multibinder.newSetBinder(binder, OverlordHelper.class)
.addBinding()
.to(DurableStorageCleaner.class);
JsonConfigProvider.bind(
binder,
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"),
DurableStorageCleanerConfig.class
);
}
}
@Inject
public void setProperties(Properties properties)
{
this.properties = properties;
}
private boolean isDurableShuffleStorageEnabled()
{
return Boolean.parseBoolean((String) properties.getOrDefault(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false"));

View File

@ -26,7 +26,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface MultiStageQuery

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.storage.StorageConnector;
import org.joda.time.Duration;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
* This method polls the durable storage for any stray directories, i.e. the ones that donot have a controller task
* associated with it and cleans them periodically.
* This ensures that the tasks which that have exited abruptly or have failed to clean up the durable storage themselves
* donot pollute it with worker outputs and temporary files. See {@link DurableStorageCleanerConfig} for the configs.
*/
public class DurableStorageCleaner implements OverlordHelper
{
private static final Logger LOG = new Logger(DurableStorageCleaner.class);
private final DurableStorageCleanerConfig config;
private final StorageConnector storageConnector;
private final Provider<TaskMaster> taskMasterProvider;
@Inject
public DurableStorageCleaner(
final DurableStorageCleanerConfig config,
final @MultiStageQuery StorageConnector storageConnector,
@JacksonInject final Provider<TaskMaster> taskMasterProvider
)
{
this.config = config;
this.storageConnector = storageConnector;
this.taskMasterProvider = taskMasterProvider;
}
@Override
public boolean isEnabled()
{
return config.isEnabled();
}
@Override
public void schedule(ScheduledExecutorService exec)
{
LOG.info("Starting the DurableStorageCleaner with the config [%s]", config);
ScheduledExecutors.scheduleWithFixedDelay(
exec,
Duration.standardSeconds(config.getDelaySeconds()),
// Added the initial delay explicitly so that we don't have to manually return the signal in the runnable
Duration.standardSeconds(config.getDelaySeconds()),
() -> {
try {
LOG.info("Starting the run of durable storage cleaner");
Optional<TaskRunner> taskRunnerOptional = taskMasterProvider.get().getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
LOG.info("Durable storage cleaner not running since the node is not the leader");
return;
}
TaskRunner taskRunner = taskRunnerOptional.get();
Set<String> allDirectories = new HashSet<>(storageConnector.listDir("/"));
Set<String> runningTaskIds = taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
Set<String> unknownDirectories = Sets.difference(allDirectories, runningTaskIds);
LOG.info(
"Following directories do not have a corresponding MSQ task associated with it:\n%s\nThese will get cleaned up.",
unknownDirectories
);
for (String unknownDirectory : unknownDirectories) {
storageConnector.deleteRecursively(unknownDirectory);
}
}
catch (IOException e) {
throw new RuntimeException("Error while running the scheduled durable storage cleanup helper", e);
}
}
);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
public class DurableStorageCleanerConfig
{
/**
* Whether the {@link DurableStorageCleaner} helper should be enabled or not
*/
@JsonProperty
public boolean enabled = false;
/**
* The delay (in seconds) after the last run post which the durable storage cleaner would clean the outputs
*/
@JsonProperty
@Min(1)
public long delaySeconds = 86400L;
public boolean isEnabled()
{
return enabled;
}
public long getDelaySeconds()
{
return delaySeconds;
}
@Override
public String toString()
{
return "DurableStorageCleanerConfig{" +
"enabled=" + enabled +
", delaySeconds=" + delaySeconds +
'}';
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.storage.StorageConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Executors;
public class DurableStorageCleanerTest
{
private static final TaskMaster TASK_MASTER = EasyMock.mock(TaskMaster.class);
private static final TaskRunner TASK_RUNNER = EasyMock.mock(TaskRunner.class);
private static final StorageConnector STORAGE_CONNECTOR = EasyMock.mock(StorageConnector.class);
private static final TaskRunnerWorkItem TASK_RUNNER_WORK_ITEM = EasyMock.mock(TaskRunnerWorkItem.class);
private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1);
private static final String TASK_ID = "dummyTaskId";
private static final String STRAY_DIR = "strayDirectory";
@Test
public void testSchedule() throws IOException, InterruptedException
{
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), "strayDirectory"))
.anyTimes();
EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<String> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteRecursively(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.schedule(Executors.newSingleThreadScheduledExecutor());
Thread.sleep(8000L);
Assert.assertEquals(STRAY_DIR, capturedArguments.getValue());
}
}

View File

@ -35,11 +35,13 @@ import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class S3StorageConnector implements StorageConnector
{
private final S3OutputConfig config;
private final ServerSideEncryptingAmazonS3 s3Client;
@ -127,10 +129,43 @@ public class S3StorageConnector implements StorageConnector
}
}
@Override
public List<String> listDir(String dirName)
{
ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
.withBucketName(config.getBucket())
.withPrefix(objectPath(dirName))
.withDelimiter(DELIM);
List<String> lsResult = new ArrayList<>();
ListObjectsV2Result objectListing = s3Client.listObjectsV2(listObjectsRequest);
while (objectListing.getObjectSummaries().size() > 0) {
objectListing.getObjectSummaries()
.stream().map(S3ObjectSummary::getKey)
.map(
key -> {
int index = key.lastIndexOf(DELIM);
return key.substring(index + 1);
}
)
.filter(keyPart -> !keyPart.isEmpty())
.forEach(lsResult::add);
if (objectListing.isTruncated()) {
listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
objectListing = s3Client.listObjectsV2(listObjectsRequest);
} else {
break;
}
}
return lsResult;
}
@Nonnull
private String objectPath(String path)
{
return JOINER.join(config.getPrefix(), path);
}
}

View File

@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -43,6 +44,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class S3StorageConnectorTest
@ -153,4 +155,22 @@ public class S3StorageConnectorTest
EasyMock.reset(S3_CLIENT, TEST_RESULT);
}
@Test
public void testListDir()
{
EasyMock.reset(S3_CLIENT, TEST_RESULT);
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setBucketName(BUCKET);
s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false);
EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject()))
.andReturn(TEST_RESULT);
EasyMock.replay(S3_CLIENT, TEST_RESULT);
List<String> listDirResult = storageConnector.listDir("/");
Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult);
}
}