diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 1ccd8129246..105afdf8f23 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -270,13 +270,18 @@ public class MaterializedViewSupervisor implements Supervisor void checkSegmentsAndSubmitTasks() { synchronized (taskLock) { + List intervalsToRemove = new ArrayList<>(); for (Map.Entry entry : runningTasks.entrySet()) { Optional taskStatus = taskStorage.getStatus(entry.getValue().getId()); if (!taskStatus.isPresent() || !taskStatus.get().isRunnable()) { - runningTasks.remove(entry.getKey()); - runningVersion.remove(entry.getKey()); + intervalsToRemove.add(entry.getKey()); } } + for (Interval interval : intervalsToRemove) { + runningTasks.remove(interval); + runningVersion.remove(interval); + } + if (runningTasks.size() == maxTaskCount) { //if the number of running tasks reach the max task count, supervisor won't submit new tasks. return; @@ -288,6 +293,12 @@ public class MaterializedViewSupervisor implements Supervisor submitTasks(sortedToBuildVersion, baseSegments); } } + + @VisibleForTesting + Pair, Map> getRunningTasks() + { + return new Pair<>(runningTasks, runningVersion); + } /** * Find infomation about the intervals in which derived dataSource data should be rebuilt. diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 7b575f01dd7..1bf1c39709d 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -27,7 +27,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.indexer.HadoopIOConfig; +import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.HadoopTuningConfig; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.HadoopIndexTask; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueue; @@ -41,7 +45,9 @@ import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -176,6 +182,83 @@ public class MaterializedViewSupervisorTest Assert.assertEquals(expectedSegments, toBuildInterval.rhs); } + @Test + public void testCheckSegmentsAndSubmitTasks() throws IOException + { + Set baseSegments = Sets.newHashSet( + new DataSegment( + "base", + Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), + "2015-01-03", + ImmutableMap.of(), + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("m1"), + new HashBasedNumberedShardSpec(0, 1, null, null), + 9, + 1024 + ) + ); + indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + expect(taskStorage.getStatus("test_task1")).andReturn(Optional.of(TaskStatus.failure("test_task1"))).anyTimes(); + expect(taskStorage.getStatus("test_task2")).andReturn(Optional.of(TaskStatus.running("test_task2"))).anyTimes(); + EasyMock.replay(taskStorage); + + Pair, Map> runningTasksPair = supervisor.getRunningTasks(); + Map runningTasks = runningTasksPair.lhs; + Map runningVersion = runningTasksPair.rhs; + + DataSchema dataSchema = new DataSchema( + "test_datasource", + null, + null, + null, + TransformSpec.NONE, + objectMapper + ); + HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null); + HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null); + HadoopIndexTask task1 = new HadoopIndexTask( + "test_task1", + spec, + null, + null, + null, + objectMapper, + null, + null, + null + ); + runningTasks.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), task1); + runningVersion.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), "test_version1"); + + HadoopIndexTask task2 = new HadoopIndexTask( + "test_task2", + spec, + null, + null, + null, + objectMapper, + null, + null, + null + ); + runningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2); + runningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2"); + + supervisor.checkSegmentsAndSubmitTasks(); + + Map expectedRunningTasks = new HashMap<>(); + Map expectedRunningVersion = new HashMap<>(); + expectedRunningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2); + expectedRunningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2"); + + Assert.assertEquals(expectedRunningTasks, runningTasks); + Assert.assertEquals(expectedRunningVersion, runningVersion); + + } @Test public void testSuspendedDoesntRun()