From 485de6a14a5f04a9996cdec2563d66577c162ea5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 19 May 2022 07:43:50 -0700 Subject: [PATCH] Add builder for TaskToolbox. (#12539) * Add builder for TaskToolbox. The main purpose of this change is to make it easier to create TaskToolboxes in tests. However, the builder is used in production too, by TaskToolboxFactory. * Fix imports, adjust formatting. * Fix import. --- .../druid/indexing/common/TaskToolbox.java | 312 ++++++++++++++++++ .../indexing/common/TaskToolboxFactory.java | 82 ++--- .../common/task/BatchAppenderatorsTest.java | 102 ++---- .../common/task/CompactionTaskRunTest.java | 95 +++--- .../common/task/CompactionTaskTest.java | 163 ++++----- .../common/task/IngestionTestBase.java | 88 ++--- ...stractParallelIndexSupervisorTaskTest.java | 112 +++---- 7 files changed, 600 insertions(+), 354 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 7c3275003b1..fae02fe08e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -286,6 +286,7 @@ public class TaskToolbox /** * Adds a monitor to the monitorScheduler if it is configured + * * @param monitor */ public void addMonitor(Monitor monitor) @@ -298,6 +299,7 @@ public class TaskToolbox /** * Adds a monitor to the monitorScheduler if it is configured + * * @param monitor */ public void removeMonitor(Monitor monitor) @@ -459,4 +461,314 @@ public class TaskToolbox { return shuffleClient; } + + public static class Builder + { + private TaskConfig config; + private DruidNode taskExecutorNode; + private TaskActionClient taskActionClient; + private ServiceEmitter emitter; + private DataSegmentPusher segmentPusher; + private DataSegmentKiller dataSegmentKiller; + private DataSegmentMover dataSegmentMover; + private DataSegmentArchiver dataSegmentArchiver; + private DataSegmentAnnouncer segmentAnnouncer; + private DataSegmentServerAnnouncer serverAnnouncer; + private SegmentHandoffNotifierFactory handoffNotifierFactory; + private Provider queryRunnerFactoryConglomerateProvider; + private QueryProcessingPool queryProcessingPool; + private JoinableFactory joinableFactory; + private Provider monitorSchedulerProvider; + private SegmentCacheManager segmentCacheManager; + private ObjectMapper jsonMapper; + private File taskWorkDir; + private IndexIO indexIO; + private Cache cache; + private CacheConfig cacheConfig; + private CachePopulatorStats cachePopulatorStats; + private IndexMergerV9 indexMergerV9; + private DruidNodeAnnouncer druidNodeAnnouncer; + private DruidNode druidNode; + private LookupNodeService lookupNodeService; + private DataNodeService dataNodeService; + private TaskReportFileWriter taskReportFileWriter; + private AuthorizerMapper authorizerMapper; + private ChatHandlerProvider chatHandlerProvider; + private RowIngestionMetersFactory rowIngestionMetersFactory; + private AppenderatorsManager appenderatorsManager; + private IndexingServiceClient indexingServiceClient; + private CoordinatorClient coordinatorClient; + private IntermediaryDataManager intermediaryDataManager; + private IndexTaskClientFactory supervisorTaskClientFactory; + private ShuffleClient shuffleClient; + + public Builder() + { + } + + public Builder config(final TaskConfig config) + { + this.config = config; + return this; + } + + public Builder taskExecutorNode(final DruidNode taskExecutorNode) + { + this.taskExecutorNode = taskExecutorNode; + return this; + } + + public Builder taskActionClient(final TaskActionClient taskActionClient) + { + this.taskActionClient = taskActionClient; + return this; + } + + public Builder emitter(final ServiceEmitter emitter) + { + this.emitter = emitter; + return this; + } + + public Builder segmentPusher(final DataSegmentPusher segmentPusher) + { + this.segmentPusher = segmentPusher; + return this; + } + + public Builder dataSegmentKiller(final DataSegmentKiller dataSegmentKiller) + { + this.dataSegmentKiller = dataSegmentKiller; + return this; + } + + public Builder dataSegmentMover(final DataSegmentMover dataSegmentMover) + { + this.dataSegmentMover = dataSegmentMover; + return this; + } + + public Builder dataSegmentArchiver(final DataSegmentArchiver dataSegmentArchiver) + { + this.dataSegmentArchiver = dataSegmentArchiver; + return this; + } + + public Builder segmentAnnouncer(final DataSegmentAnnouncer segmentAnnouncer) + { + this.segmentAnnouncer = segmentAnnouncer; + return this; + } + + public Builder serverAnnouncer(final DataSegmentServerAnnouncer serverAnnouncer) + { + this.serverAnnouncer = serverAnnouncer; + return this; + } + + public Builder handoffNotifierFactory(final SegmentHandoffNotifierFactory handoffNotifierFactory) + { + this.handoffNotifierFactory = handoffNotifierFactory; + return this; + } + + public Builder queryRunnerFactoryConglomerateProvider(final Provider queryRunnerFactoryConglomerateProvider) + { + this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider; + return this; + } + + public Builder queryProcessingPool(final QueryProcessingPool queryProcessingPool) + { + this.queryProcessingPool = queryProcessingPool; + return this; + } + + public Builder joinableFactory(final JoinableFactory joinableFactory) + { + this.joinableFactory = joinableFactory; + return this; + } + + public Builder monitorSchedulerProvider(final Provider monitorSchedulerProvider) + { + this.monitorSchedulerProvider = monitorSchedulerProvider; + return this; + } + + public Builder segmentCacheManager(final SegmentCacheManager segmentCacheManager) + { + this.segmentCacheManager = segmentCacheManager; + return this; + } + + public Builder jsonMapper(final ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + return this; + } + + public Builder taskWorkDir(final File taskWorkDir) + { + this.taskWorkDir = taskWorkDir; + return this; + } + + public Builder indexIO(final IndexIO indexIO) + { + this.indexIO = indexIO; + return this; + } + + public Builder cache(final Cache cache) + { + this.cache = cache; + return this; + } + + public Builder cacheConfig(final CacheConfig cacheConfig) + { + this.cacheConfig = cacheConfig; + return this; + } + + public Builder cachePopulatorStats(final CachePopulatorStats cachePopulatorStats) + { + this.cachePopulatorStats = cachePopulatorStats; + return this; + } + + public Builder indexMergerV9(final IndexMergerV9 indexMergerV9) + { + this.indexMergerV9 = indexMergerV9; + return this; + } + + public Builder druidNodeAnnouncer(final DruidNodeAnnouncer druidNodeAnnouncer) + { + this.druidNodeAnnouncer = druidNodeAnnouncer; + return this; + } + + public Builder druidNode(final DruidNode druidNode) + { + this.druidNode = druidNode; + return this; + } + + public Builder lookupNodeService(final LookupNodeService lookupNodeService) + { + this.lookupNodeService = lookupNodeService; + return this; + } + + public Builder dataNodeService(final DataNodeService dataNodeService) + { + this.dataNodeService = dataNodeService; + return this; + } + + public Builder taskReportFileWriter(final TaskReportFileWriter taskReportFileWriter) + { + this.taskReportFileWriter = taskReportFileWriter; + return this; + } + + public Builder authorizerMapper(final AuthorizerMapper authorizerMapper) + { + this.authorizerMapper = authorizerMapper; + return this; + } + + public Builder chatHandlerProvider(final ChatHandlerProvider chatHandlerProvider) + { + this.chatHandlerProvider = chatHandlerProvider; + return this; + } + + public Builder rowIngestionMetersFactory(final RowIngestionMetersFactory rowIngestionMetersFactory) + { + this.rowIngestionMetersFactory = rowIngestionMetersFactory; + return this; + } + + public Builder appenderatorsManager(final AppenderatorsManager appenderatorsManager) + { + this.appenderatorsManager = appenderatorsManager; + return this; + } + + public Builder indexingServiceClient(final IndexingServiceClient indexingServiceClient) + { + this.indexingServiceClient = indexingServiceClient; + return this; + } + + public Builder coordinatorClient(final CoordinatorClient coordinatorClient) + { + this.coordinatorClient = coordinatorClient; + return this; + } + + public Builder intermediaryDataManager(final IntermediaryDataManager intermediaryDataManager) + { + this.intermediaryDataManager = intermediaryDataManager; + return this; + } + + public Builder supervisorTaskClientFactory(final IndexTaskClientFactory supervisorTaskClientFactory) + { + this.supervisorTaskClientFactory = supervisorTaskClientFactory; + return this; + } + + public Builder shuffleClient(final ShuffleClient shuffleClient) + { + this.shuffleClient = shuffleClient; + return this; + } + + public TaskToolbox build() + { + return new TaskToolbox( + config, + taskExecutorNode, + taskActionClient, + emitter, + segmentPusher, + dataSegmentKiller, + dataSegmentMover, + dataSegmentArchiver, + segmentAnnouncer, + serverAnnouncer, + handoffNotifierFactory, + queryRunnerFactoryConglomerateProvider, + queryProcessingPool, + joinableFactory, + monitorSchedulerProvider, + segmentCacheManager, + jsonMapper, + taskWorkDir, + indexIO, + cache, + cacheConfig, + cachePopulatorStats, + indexMergerV9, + druidNodeAnnouncer, + druidNode, + lookupNodeService, + dataNodeService, + taskReportFileWriter, + intermediaryDataManager, + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory, + appenderatorsManager, + indexingServiceClient, + coordinatorClient, + supervisorTaskClientFactory, + shuffleClient + ); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index df50c233015..9ccc94659b0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -189,44 +189,48 @@ public class TaskToolboxFactory public TaskToolbox build(Task task) { final File taskWorkDir = config.getTaskWorkDir(task.getId()); - return new TaskToolbox( - config, - taskExecutorNode, - taskActionClientFactory.create(task), - emitter, - segmentPusher, - dataSegmentKiller, - dataSegmentMover, - dataSegmentArchiver, - segmentAnnouncer, - serverAnnouncer, - handoffNotifierFactory, - queryRunnerFactoryConglomerateProvider, - queryProcessingPool, - joinableFactory, - monitorSchedulerProvider, - segmentCacheManagerFactory.manufacturate(taskWorkDir), - jsonMapper, - taskWorkDir, - indexIO, - cache, - cacheConfig, - cachePopulatorStats, - indexMergerV9Factory.create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns())), - druidNodeAnnouncer, - druidNode, - lookupNodeService, - dataNodeService, - taskReportFileWriter, - intermediaryDataManager, - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory, - appenderatorsManager, - indexingServiceClient, - coordinatorClient, - supervisorTaskClientFactory, - shuffleClient - ); + return new TaskToolbox.Builder() + .config(config) + .taskExecutorNode(taskExecutorNode) + .taskActionClient(taskActionClientFactory.create(task)) + .emitter(emitter) + .segmentPusher(segmentPusher) + .dataSegmentKiller(dataSegmentKiller) + .dataSegmentMover(dataSegmentMover) + .dataSegmentArchiver(dataSegmentArchiver) + .segmentAnnouncer(segmentAnnouncer) + .serverAnnouncer(serverAnnouncer) + .handoffNotifierFactory(handoffNotifierFactory) + .queryRunnerFactoryConglomerateProvider(queryRunnerFactoryConglomerateProvider) + .queryProcessingPool(queryProcessingPool) + .joinableFactory(joinableFactory) + .monitorSchedulerProvider(monitorSchedulerProvider) + .segmentCacheManager(segmentCacheManagerFactory.manufacturate(taskWorkDir)) + .jsonMapper(jsonMapper) + .taskWorkDir(taskWorkDir) + .indexIO(indexIO) + .cache(cache) + .cacheConfig(cacheConfig) + .cachePopulatorStats(cachePopulatorStats) + .indexMergerV9( + indexMergerV9Factory.create( + task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns()) + ) + ) + .druidNodeAnnouncer(druidNodeAnnouncer) + .druidNode(druidNode) + .lookupNodeService(lookupNodeService) + .dataNodeService(dataNodeService) + .taskReportFileWriter(taskReportFileWriter) + .intermediaryDataManager(intermediaryDataManager) + .authorizerMapper(authorizerMapper) + .chatHandlerProvider(chatHandlerProvider) + .rowIngestionMetersFactory(rowIngestionMetersFactory) + .appenderatorsManager(appenderatorsManager) + .indexingServiceClient(indexingServiceClient) + .coordinatorClient(coordinatorClient) + .supervisorTaskClientFactory(supervisorTaskClientFactory) + .shuffleClient(shuffleClient) + .build(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java index 483ee0b1ac7..f378a53a4dd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java @@ -243,7 +243,7 @@ public class BatchAppenderatorsTest "foo", new TestAppenderatorsManager(), metrics, - new TestTaskToolbox( + makeTaskToolbox( objectMapper, indexMerger, TaskConfig.BatchProcessingMode.OPEN_SEGMENTS @@ -266,7 +266,7 @@ public class BatchAppenderatorsTest "foo", new TestAppenderatorsManager(), metrics, - new TestTaskToolbox( + makeTaskToolbox( objectMapper, indexMerger, TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS @@ -290,7 +290,7 @@ public class BatchAppenderatorsTest "foo", new TestAppenderatorsManager(), metrics, - new TestTaskToolbox( + makeTaskToolbox( objectMapper, indexMerger, TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS @@ -560,71 +560,39 @@ public class BatchAppenderatorsTest } } - - private static class TestTaskToolbox extends TaskToolbox + private static TaskToolbox makeTaskToolbox( + ObjectMapper mapper, + IndexMergerV9 indexMergerV9, + TaskConfig.BatchProcessingMode mode + ) { - private final Map segmentFileMap; - - TestTaskToolbox(ObjectMapper mapper, IndexMergerV9 indexMergerV9, TaskConfig.BatchProcessingMode mode) - { - super( - new TaskConfig( - null, - null, - null, - null, - null, - false, - null, - null, - null, - false, - false, - mode.name(), - null - ), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - NoopJoinableFactory.INSTANCE, - null, - null, - mapper, - null, - new IndexIO( - new ObjectMapper(), - () -> 0 - ), - null, - null, - null, - indexMergerV9, - null, - null, - null, - null, - new NoopTestTaskReportFileWriter(), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - new NoopChatHandlerProvider(), - null, - new TestAppenderatorsManager(), - null, - null, - null, - null - ); - this.segmentFileMap = null; - } + return new TaskToolbox.Builder() + .config( + new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + null, + false, + false, + mode.name(), + null + ) + ) + .joinableFactory(NoopJoinableFactory.INSTANCE) + .jsonMapper(mapper) + .indexIO(new IndexIO(new ObjectMapper(), () -> 0)) + .indexMergerV9(indexMergerV9) + .taskReportFileWriter(new NoopTestTaskReportFileWriter()) + .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) + .chatHandlerProvider(new NoopChatHandlerProvider()) + .appenderatorsManager(new TestAppenderatorsManager()) + .build(); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index bfd184e1375..dfcee9b7e7a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -973,7 +973,7 @@ public class CompactionTaskRunTest extends IngestionTestBase // maxRowsPerSegment is set to 2 inside the runIndexTask methods Pair> result = runIndexTask(); Assert.assertEquals(6, result.rhs.size()); - + final Builder builder = new Builder( DATA_SOURCE, segmentCacheManagerFactory, @@ -1097,7 +1097,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T02:00:00.000Z/2014-01-01T03:00:00.000Z"), realSegmentsAfterFullCompaction.get(2).getInterval() ); - + } @Test @@ -1596,7 +1596,8 @@ public class CompactionTaskRunTest extends IngestionTestBase private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException { final SegmentCacheManager loader = new SegmentLocalCacheManager( - new SegmentLoaderConfig() { + new SegmentLoaderConfig() + { @Override public List getLocations() { @@ -1606,59 +1607,41 @@ public class CompactionTaskRunTest extends IngestionTestBase objectMapper ); - return new TaskToolbox( - new TaskConfig( - null, - null, - null, - null, - null, - false, - null, - null, - null, - false, - false, - TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), - null - ), - null, - createActionClient(task), - null, - new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()), - new NoopDataSegmentKiller(), - null, - null, - null, - null, - null, - null, - null, - NoopJoinableFactory.INSTANCE, - null, - loader, - objectMapper, - temporaryFolder.newFolder(), - getIndexIO(), - null, - null, - null, - getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)), - null, - null, - null, - null, - new NoopTestTaskReportFileWriter(), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - new NoopChatHandlerProvider(), - testUtils.getRowIngestionMetersFactory(), - new TestAppenderatorsManager(), - indexingServiceClient, - coordinatorClient, - null, - null - ); + return new TaskToolbox.Builder() + .config( + new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + null, + false, + false, + TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), + null + ) + ) + .taskActionClient(createActionClient(task)) + .segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig())) + .dataSegmentKiller(new NoopDataSegmentKiller()) + .joinableFactory(NoopJoinableFactory.INSTANCE) + .segmentCacheManager(loader) + .jsonMapper(objectMapper) + .taskWorkDir(temporaryFolder.newFolder()) + .indexIO(getIndexIO()) + .indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) + .taskReportFileWriter(new NoopTestTaskReportFileWriter()) + .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) + .chatHandlerProvider(new NoopChatHandlerProvider()) + .rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory()) + .appenderatorsManager(new TestAppenderatorsManager()) + .indexingServiceClient(indexingServiceClient) + .coordinatorClient(coordinatorClient) + .build(); } private List getCSVFormatRowsFromSegments(List segments) throws Exception diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f0d51f2a78a..a0f85232f11 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -118,6 +118,7 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -151,6 +152,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -281,7 +283,8 @@ public class CompactionTaskTest binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory()); binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); - binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); + binder.bind(SegmentCacheManagerFactory.class) + .toInstance(new SegmentCacheManagerFactory(objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager()); binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT); } @@ -360,7 +363,7 @@ public class CompactionTaskTest public void setup() { final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP); - toolbox = new TestTaskToolbox( + toolbox = makeTaskToolbox( new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())), testIndexIO, SEGMENT_MAP @@ -428,7 +431,8 @@ public class CompactionTaskTest @Test public void testCreateCompactionTaskWithTransformSpec() { - ClientCompactionTaskTransformSpec transformSpec = new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)); + ClientCompactionTaskTransformSpec transformSpec = + new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)); final Builder builder = new Builder( DATA_SOURCE, segmentCacheManagerFactory, @@ -447,7 +451,7 @@ public class CompactionTaskTest @Test public void testCreateCompactionTaskWithMetricsSpec() { - AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")}; + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new CountAggregatorFactory("cnt")}; final Builder builder = new Builder( DATA_SOURCE, segmentCacheManagerFactory, @@ -1942,82 +1946,87 @@ public class CompactionTaskTest } } - private static class TestTaskToolbox extends TaskToolbox + private static TaskToolbox makeTaskToolbox( + TaskActionClient taskActionClient, + IndexIO indexIO, + Map segments + ) { - private final Map segmentFileMap; - - TestTaskToolbox( - TaskActionClient taskActionClient, - IndexIO indexIO, - Map segmentFileMap - ) + final SegmentCacheManager segmentCacheManager = new SegmentCacheManager() { - super( - new TaskConfig( - null, - null, - null, - null, - null, - false, - null, - null, - null, - false, - false, - TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), - null - ), - null, - taskActionClient, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - NoopJoinableFactory.INSTANCE, - null, - null, - null, - null, - indexIO, - null, - null, - null, - new IndexMergerV9(OBJECT_MAPPER, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance(), true), - null, - null, - null, - null, - new NoopTestTaskReportFileWriter(), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - new NoopChatHandlerProvider(), - TEST_UTILS.getRowIngestionMetersFactory(), - new TestAppenderatorsManager(), - INDEXING_SERVICE_CLIENT, - COORDINATOR_CLIENT, - null, - null - ); - this.segmentFileMap = segmentFileMap; - } - - @Override - public Map fetchSegments(List segments) - { - final Map submap = Maps.newHashMapWithExpectedSize(segments.size()); - for (DataSegment segment : segments) { - final File file = Preconditions.checkNotNull(segmentFileMap.get(segment)); - submap.put(segment, file); + @Override + public boolean isSegmentCached(DataSegment segment) + { + throw new UnsupportedOperationException(); } - return submap; - } + + @Override + public File getSegmentFiles(DataSegment segment) + { + return Preconditions.checkNotNull(segments.get(segment)); + } + + @Override + public boolean reserve(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public void cleanup(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec) + { + throw new UnsupportedOperationException(); + } + }; + + return new TaskToolbox.Builder() + .config( + new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + null, + false, + false, + TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), + null + ) + ) + .taskActionClient(taskActionClient) + .joinableFactory(NoopJoinableFactory.INSTANCE) + .indexIO(indexIO) + .indexMergerV9(new IndexMergerV9( + OBJECT_MAPPER, + indexIO, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true + )) + .taskReportFileWriter(new NoopTestTaskReportFileWriter()) + .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) + .chatHandlerProvider(new NoopChatHandlerProvider()) + .rowIngestionMetersFactory(TEST_UTILS.getRowIngestionMetersFactory()) + .appenderatorsManager(new TestAppenderatorsManager()) + .indexingServiceClient(INDEXING_SERVICE_CLIENT) + .coordinatorClient(COORDINATOR_CLIENT) + .segmentCacheManager(segmentCacheManager) + .build(); } private static class TestTaskActionClient implements TaskActionClient diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 7652f2fbf0c..1de41f57733 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -314,59 +314,41 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest StringUtils.format("ingestionTestBase-%s.json", System.currentTimeMillis()) ); - final TaskToolbox box = new TaskToolbox( - new TaskConfig( - null, - null, - null, - null, - null, - false, - null, - null, - null, - false, - false, - TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), - null - ), - new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), - taskActionClient, - null, - new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()), - new NoopDataSegmentKiller(), - null, - null, - null, - null, - null, - null, - null, - NoopJoinableFactory.INSTANCE, - null, - null, - objectMapper, - temporaryFolder.newFolder(), - getIndexIO(), - null, - null, - null, - testUtils.getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)), - null, - null, - null, - null, - new SingleFileTaskReportFileWriter(taskReportsFile), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - new NoopChatHandlerProvider(), - testUtils.getRowIngestionMetersFactory(), - new TestAppenderatorsManager(), - new NoopIndexingServiceClient(), - null, - null, - null - ); + final TaskToolbox box = new TaskToolbox.Builder() + .config( + new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + null, + false, + false, + TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), + null + ) + ) + .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false)) + .taskActionClient(taskActionClient) + .segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig())) + .dataSegmentKiller(new NoopDataSegmentKiller()) + .joinableFactory(NoopJoinableFactory.INSTANCE) + .jsonMapper(objectMapper) + .taskWorkDir(temporaryFolder.newFolder()) + .indexIO(getIndexIO()) + .indexMergerV9(testUtils.getIndexMergerV9Factory() + .create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) + .taskReportFileWriter(new SingleFileTaskReportFileWriter(taskReportsFile)) + .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) + .chatHandlerProvider(new NoopChatHandlerProvider()) + .rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory()) + .appenderatorsManager(new TestAppenderatorsManager()) + .indexingServiceClient(new NoopIndexingServiceClient()) + .build(); if (task.isReady(box.getTaskActionClient())) { return Futures.immediateFuture(task.run(box)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 835e24b9663..0bf1021a891 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -689,68 +689,56 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException { - return new TaskToolbox( - new TaskConfig( - null, - null, - null, - null, - null, - false, - null, - null, - null, - false, - false, - TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), - null - ), - new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), - actionClient, - null, - new LocalDataSegmentPusher( - new LocalDataSegmentPusherConfig() - { - @Override - public File getStorageDirectory() - { - return localDeepStorage; - } - } - ), - new NoopDataSegmentKiller(), - null, - null, - null, - null, - null, - null, - null, - NoopJoinableFactory.INSTANCE, - null, - newSegmentLoader(temporaryFolder.newFolder()), - objectMapper, - temporaryFolder.newFolder(task.getId()), - getIndexIO(), - null, - null, - null, - getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)), - null, - null, - null, - null, - new NoopTestTaskReportFileWriter(), - intermediaryDataManager, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - new NoopChatHandlerProvider(), - new TestUtils().getRowIngestionMetersFactory(), - new TestAppenderatorsManager(), - indexingServiceClient, - coordinatorClient, - new LocalParallelIndexTaskClientFactory(taskRunner, transientApiCallFailureRate), - new LocalShuffleClient(intermediaryDataManager) - ); + return new TaskToolbox.Builder() + .config( + new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + null, + false, + false, + TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), + null + ) + ) + .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false)) + .taskActionClient(actionClient) + .segmentPusher( + new LocalDataSegmentPusher( + new LocalDataSegmentPusherConfig() + { + @Override + public File getStorageDirectory() + { + return localDeepStorage; + } + } + ) + ) + .dataSegmentKiller(new NoopDataSegmentKiller()) + .joinableFactory(NoopJoinableFactory.INSTANCE) + .segmentCacheManager(newSegmentLoader(temporaryFolder.newFolder())) + .jsonMapper(objectMapper) + .taskWorkDir(temporaryFolder.newFolder(task.getId())) + .indexIO(getIndexIO()) + .indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) + .taskReportFileWriter(new NoopTestTaskReportFileWriter()) + .intermediaryDataManager(intermediaryDataManager) + .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) + .chatHandlerProvider(new NoopChatHandlerProvider()) + .rowIngestionMetersFactory(new TestUtils().getRowIngestionMetersFactory()) + .appenderatorsManager(new TestAppenderatorsManager()) + .indexingServiceClient(indexingServiceClient) + .coordinatorClient(coordinatorClient) + .supervisorTaskClientFactory(new LocalParallelIndexTaskClientFactory(taskRunner, transientApiCallFailureRate)) + .shuffleClient(new LocalShuffleClient(intermediaryDataManager)) + .build(); } static class TestParallelIndexSupervisorTask extends ParallelIndexSupervisorTask