diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index b34241a27ce..890cb90fd3d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -289,13 +289,17 @@ public abstract class ParallelIndexPhaseRunner getReports() { - return taskMonitor.getReports(); + return taskMonitor == null ? Collections.emptyMap() : taskMonitor.getReports(); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 7b9ac3d4614..f04559f26c8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -81,6 +81,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv } private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M"); + private static final String VALID_INPUT_SOURCE_FILTER = "test_*"; private final LockGranularity lockGranularity; private final boolean useInputFormatApi; @@ -368,7 +369,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv null, null, null - ) + ), + VALID_INPUT_SOURCE_FILTER ); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); @@ -393,6 +395,24 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv Assert.assertEquals(new HashSet<>(newSegments), visibles); } + @Test + public void testRunParallelWithNoInputSplitToProcess() + { + // The input source filter on this task does not match any input + // Hence, the this task will has no input split to process + final ParallelIndexSupervisorTask task = newTask( + Intervals.of("2017-12/P1M"), + Granularities.DAY, + true, + true, + AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, + "non_existing_file_filter" + ); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + // Task state should still be SUCCESS even if no input split to process + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + } + @Test public void testOverwriteAndAppend() { @@ -437,7 +457,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv segmentGranularity, appendToExisting, splittableInputSource, - AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING + AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, + VALID_INPUT_SOURCE_FILTER ); } @@ -446,7 +467,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv Granularity segmentGranularity, boolean appendToExisting, boolean splittableInputSource, - ParallelIndexTuningConfig tuningConfig + ParallelIndexTuningConfig tuningConfig, + String inputSourceFilter ) { // set up ingestion spec @@ -469,7 +491,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv ), new ParallelIndexIOConfig( null, - new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource), + new SettableSplittableLocalInputSource(inputDir, inputSourceFilter, splittableInputSource), DEFAULT_INPUT_FORMAT, appendToExisting, null @@ -499,7 +521,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv getObjectMapper() ), new ParallelIndexIOConfig( - new LocalFirehoseFactory(inputDir, "test_*", null), + new LocalFirehoseFactory(inputDir, inputSourceFilter, null), appendToExisting ), tuningConfig diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index acab50df3d3..6dca729fd6a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -302,7 +302,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest } } - private void submitTaskAndWait( + protected void submitTaskAndWait( String taskSpec, String dataSourceName, boolean waitForNewVersion, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index ec9c7535b90..833a0ac5855 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -169,6 +169,35 @@ public class ITIndexerTest extends AbstractITBatchIndexTest } } + @Test + public void testReIndexWithNonExistingDatasource() throws Exception + { + Pair dummyPair = new Pair<>(false, false); + final String fullBaseDatasourceName = "nonExistingDatasource2904"; + final String fullReindexDatasourceName = "newDatasource123"; + + String taskSpec = StringUtils.replace( + getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE), + "%%DATASOURCE%%", + fullBaseDatasourceName + ); + taskSpec = StringUtils.replace( + taskSpec, + "%%REINDEX_DATASOURCE%%", + fullReindexDatasourceName + ); + + // This method will also verify task is successful after task finish running + // We expect task to be successful even if the datasource to reindex does not exist + submitTaskAndWait( + taskSpec, + fullReindexDatasourceName, + false, + false, + dummyPair + ); + } + @Test public void testMERGEIndexData() throws Exception {