Add query results directory and prevent the auto cleaner from cleaning it (#14446)

Adds support for automatic cleaning of a "query-results" directory in durable storage. This directory will be cleaned up only if the task id is not known to the overlord. This will allow the storage of query results after the task has finished running.
This commit is contained in:
Adarsh Sanjeev 2023-06-28 10:14:04 +05:30 committed by GitHub
parent 2cfb00b1de
commit 0335aaa279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 17 deletions

View File

@ -365,7 +365,7 @@ For detailed information about the settings related to durable storage, see [Dur
### Use durable storage for queries
When you run a query, include the context parameter `durableShuffleStorage` and set it to `true`.
When you run a query, include the context parameter `durableShuffleStorage` and set it to `true`.
For queries where you want to use fault tolerance for workers, set `faultTolerance` to `true`, which automatically sets `durableShuffleStorage` to `true`.

View File

@ -88,13 +88,22 @@ public class DurableStorageCleaner implements OverlordDuty
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
Set<String> knownTaskIds = taskRunner.getKnownTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
Set<String> filesToRemove = new HashSet<>();
while (allFiles.hasNext()) {
String currentFile = allFiles.next();
String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
String nextDirName = DurableStorageUtils.getNextDirNameWithPrefixFromPath(currentFile);
if (nextDirName != null && !nextDirName.isEmpty()) {
if (runningTaskIds.contains(nextDirName)) {
// do nothing
} else if (DurableStorageUtils.QUERY_RESULTS_DIR.equals(nextDirName)
&& DurableStorageUtils.isQueryResultFileActive(currentFile, knownTaskIds)) {
// query results should not be cleaned even if the task has finished running
// do nothing
} else {
filesToRemove.add(currentFile);

View File

@ -31,6 +31,7 @@ import org.apache.druid.storage.StorageConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
@ -45,19 +46,25 @@ public class DurableStorageCleanerTest
private static final TaskRunnerWorkItem TASK_RUNNER_WORK_ITEM = EasyMock.mock(TaskRunnerWorkItem.class);
private static final String TASK_ID = "dummyTaskId";
private static final String STRAY_DIR = "strayDirectory";
private DurableStorageCleaner durableStorageCleaner;
@Test
public void testRun() throws Exception
@Before
public void setUp()
{
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR, TASK_MASTER);
DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
}
@Test
public void testRun() throws Exception
{
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR)
.stream()
@ -68,15 +75,49 @@ public class DurableStorageCleanerTest
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getKnownTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.run();
Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue());
}
@Test
public void testRunExcludesQueryDirectory() throws Exception
{
final String resultPath = DurableStorageUtils.QUERY_RESULTS_DIR + "/" + DurableStorageUtils.getControllerDirectory(TASK_ID) + "/results.json";
final String intermediateFilesPath = DurableStorageUtils.getControllerDirectory(TASK_ID) + "/intermediate.frame";
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(resultPath, STRAY_DIR, intermediateFilesPath)
.stream()
.iterator())
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of())
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getKnownTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.run();
Assert.assertEquals(Sets.newHashSet(STRAY_DIR, intermediateFilesPath), capturedArguments.getValue());
}
@Test
public void testGetSchedule()
{

View File

@ -20,11 +20,14 @@
package org.apache.druid.frame.util;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Helper class that fetches the directory and file names corresponding to file location
@ -32,7 +35,8 @@ import java.util.Iterator;
public class DurableStorageUtils
{
public static final String SUCCESS_MARKER_FILENAME = "__success";
public static final Splitter SPLITTER = Splitter.on("/").limit(2);
public static final Splitter SPLITTER = Splitter.on("/").limit(3);
public static final String QUERY_RESULTS_DIR = "query-results";
public static String getControllerDirectory(final String controllerTaskId)
{
@ -127,7 +131,7 @@ public class DurableStorageUtils
}
/**
* Tries to parse out the controller taskID from the input path.
* Tries to parse out the most top level directory from the path. Returns null if there is no such directory.
* <br></br>
* For eg:
* <br/>
@ -138,7 +142,7 @@ public class DurableStorageUtils
* </ul>
*/
@Nullable
public static String getControllerTaskIdWithPrefixFromPath(String path)
public static String getNextDirNameWithPrefixFromPath(String path)
{
if (path == null) {
return null;
@ -150,4 +154,35 @@ public class DurableStorageUtils
return null;
}
}
/**
* Tries to parse out the controller taskID from the query results path, and checks if the taskID is present in the
* set of known tasks.
* Returns true if the set contains the taskId.
* Returns false if taskId could not be parsed or if the set does not contain the taskId.
* <br></br>
* For eg:
* <br/>
* <ul>
* <li>for path <b>controller_query_id/task/123</b> the function will return <b>false</b></li>
* <li>for path <b>query-result/controller_query_id/results.json</b>, the function will return <b>true</b></li> if the controller_query_id is in known tasks
* <li>for path <b>query-result/controller_query_id/results.json</b>, the function will return <b>false</b></li> if the controller_query_id is not in known tasks
* <li>for path <b>null</b>, the function will return <b>false</b></li>
* </ul>
*/
public static boolean isQueryResultFileActive(String path, Set<String> knownTasks)
{
if (path == null) {
return false;
}
Iterator<String> elementsIterator = SPLITTER.split(path).iterator();
List<String> elements = ImmutableList.copyOf(elementsIterator);
if (elements.size() < 2) {
return false;
}
if (!DurableStorageUtils.QUERY_RESULTS_DIR.equals(elements.get(0))) {
return false;
}
return knownTasks.contains(elements.get(1));
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.frame.util;
import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
@ -26,13 +27,39 @@ public class DurableStorageUtilsTest
{
@Test
public void getControllerTaskIdWithPrefixFromPath()
public void getNextDirNameWithPrefixFromPath()
{
Assert.assertEquals("", DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("/123/123"));
Assert.assertEquals("123", DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("123"));
Assert.assertEquals("", DurableStorageUtils.getNextDirNameWithPrefixFromPath("/123/123"));
Assert.assertEquals("123", DurableStorageUtils.getNextDirNameWithPrefixFromPath("123"));
Assert.assertEquals("controller_query_123",
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("controller_query_123/123"));
Assert.assertEquals("", DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(""));
Assert.assertNull(DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(null));
DurableStorageUtils.getNextDirNameWithPrefixFromPath("controller_query_123/123"));
Assert.assertEquals("", DurableStorageUtils.getNextDirNameWithPrefixFromPath(""));
Assert.assertNull(DurableStorageUtils.getNextDirNameWithPrefixFromPath(null));
}
@Test
public void isQueryResultFileActive()
{
Assert.assertTrue(DurableStorageUtils.isQueryResultFileActive(
DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
ImmutableSet.of("123")
));
Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
ImmutableSet.of("")
));
Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
DurableStorageUtils.QUERY_RESULTS_DIR + "/",
ImmutableSet.of("123")
));
Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
null,
ImmutableSet.of("123")
));
Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
DurableStorageUtils.QUERY_RESULTS_DIR,
ImmutableSet.of("123")
));
}
}