Fix ingestion task failure when no input split to process (#11553)

* fix ingestion task failure when no input split to process

* add IT

* fix IT
This commit is contained in:
Maytas Monsereenusorn 2021-08-09 23:11:08 +07:00 committed by GitHub
parent 3e7cba738f
commit 06bae29979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 7 deletions

View File

@ -289,13 +289,17 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
@Override
public void collectReport(SubTaskReportType report)
{
// This method is only called when there is a subtask sending its report.
// Since TaskMonitor is responsible for spawning subtasks, the taskMonitor cannot be null if we have subtask sending report
// This null check is to ensure that the contract mentioned above is not broken
assert taskMonitor != null;
taskMonitor.collectReport(report);
}
@Override
public Map<String, SubTaskReportType> getReports()
{
return taskMonitor.getReports();
return taskMonitor == null ? Collections.emptyMap() : taskMonitor.getReports();
}
@Override

View File

@ -81,6 +81,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
@ -368,7 +369,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null,
null,
null
)
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
@ -393,6 +395,24 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
@Test
public void testRunParallelWithNoInputSplitToProcess()
{
// The input source filter on this task does not match any input
// Hence, the this task will has no input split to process
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017-12/P1M"),
Granularities.DAY,
true,
true,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
"non_existing_file_filter"
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
// Task state should still be SUCCESS even if no input split to process
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testOverwriteAndAppend()
{
@ -437,7 +457,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
segmentGranularity,
appendToExisting,
splittableInputSource,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
VALID_INPUT_SOURCE_FILTER
);
}
@ -446,7 +467,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
ParallelIndexTuningConfig tuningConfig
ParallelIndexTuningConfig tuningConfig,
String inputSourceFilter
)
{
// set up ingestion spec
@ -469,7 +491,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource),
new SettableSplittableLocalInputSource(inputDir, inputSourceFilter, splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting,
null
@ -499,7 +521,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
getObjectMapper()
),
new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, "test_*", null),
new LocalFirehoseFactory(inputDir, inputSourceFilter, null),
appendToExisting
),
tuningConfig

View File

@ -302,7 +302,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
}
}
private void submitTaskAndWait(
protected void submitTaskAndWait(
String taskSpec,
String dataSourceName,
boolean waitForNewVersion,

View File

@ -169,6 +169,35 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
}
}
@Test
public void testReIndexWithNonExistingDatasource() throws Exception
{
Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
final String fullBaseDatasourceName = "nonExistingDatasource2904";
final String fullReindexDatasourceName = "newDatasource123";
String taskSpec = StringUtils.replace(
getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
"%%DATASOURCE%%",
fullBaseDatasourceName
);
taskSpec = StringUtils.replace(
taskSpec,
"%%REINDEX_DATASOURCE%%",
fullReindexDatasourceName
);
// This method will also verify task is successful after task finish running
// We expect task to be successful even if the datasource to reindex does not exist
submitTaskAndWait(
taskSpec,
fullReindexDatasourceName,
false,
false,
dummyPair
);
}
@Test
public void testMERGEIndexData() throws Exception
{