Add builder for TaskToolbox. (#12539)

* Add builder for TaskToolbox.

The main purpose of this change is to make it easier to create
TaskToolboxes in tests. However, the builder is used in production
too, by TaskToolboxFactory.

* Fix imports, adjust formatting.

* Fix import.
This commit is contained in:
Gian Merlino 2022-05-19 07:43:50 -07:00 committed by GitHub
parent 4631cff2a9
commit 485de6a14a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 600 additions and 354 deletions

View File

@ -286,6 +286,7 @@ public class TaskToolbox
/**
* Adds a monitor to the monitorScheduler if it is configured
*
* @param monitor
*/
public void addMonitor(Monitor monitor)
@ -298,6 +299,7 @@ public class TaskToolbox
/**
* Adds a monitor to the monitorScheduler if it is configured
*
* @param monitor
*/
public void removeMonitor(Monitor monitor)
@ -459,4 +461,314 @@ public class TaskToolbox
{
return shuffleClient;
}
public static class Builder
{
private TaskConfig config;
private DruidNode taskExecutorNode;
private TaskActionClient taskActionClient;
private ServiceEmitter emitter;
private DataSegmentPusher segmentPusher;
private DataSegmentKiller dataSegmentKiller;
private DataSegmentMover dataSegmentMover;
private DataSegmentArchiver dataSegmentArchiver;
private DataSegmentAnnouncer segmentAnnouncer;
private DataSegmentServerAnnouncer serverAnnouncer;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
private Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private QueryProcessingPool queryProcessingPool;
private JoinableFactory joinableFactory;
private Provider<MonitorScheduler> monitorSchedulerProvider;
private SegmentCacheManager segmentCacheManager;
private ObjectMapper jsonMapper;
private File taskWorkDir;
private IndexIO indexIO;
private Cache cache;
private CacheConfig cacheConfig;
private CachePopulatorStats cachePopulatorStats;
private IndexMergerV9 indexMergerV9;
private DruidNodeAnnouncer druidNodeAnnouncer;
private DruidNode druidNode;
private LookupNodeService lookupNodeService;
private DataNodeService dataNodeService;
private TaskReportFileWriter taskReportFileWriter;
private AuthorizerMapper authorizerMapper;
private ChatHandlerProvider chatHandlerProvider;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private AppenderatorsManager appenderatorsManager;
private IndexingServiceClient indexingServiceClient;
private CoordinatorClient coordinatorClient;
private IntermediaryDataManager intermediaryDataManager;
private IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> supervisorTaskClientFactory;
private ShuffleClient shuffleClient;
public Builder()
{
}
public Builder config(final TaskConfig config)
{
this.config = config;
return this;
}
public Builder taskExecutorNode(final DruidNode taskExecutorNode)
{
this.taskExecutorNode = taskExecutorNode;
return this;
}
public Builder taskActionClient(final TaskActionClient taskActionClient)
{
this.taskActionClient = taskActionClient;
return this;
}
public Builder emitter(final ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
}
public Builder segmentPusher(final DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public Builder dataSegmentKiller(final DataSegmentKiller dataSegmentKiller)
{
this.dataSegmentKiller = dataSegmentKiller;
return this;
}
public Builder dataSegmentMover(final DataSegmentMover dataSegmentMover)
{
this.dataSegmentMover = dataSegmentMover;
return this;
}
public Builder dataSegmentArchiver(final DataSegmentArchiver dataSegmentArchiver)
{
this.dataSegmentArchiver = dataSegmentArchiver;
return this;
}
public Builder segmentAnnouncer(final DataSegmentAnnouncer segmentAnnouncer)
{
this.segmentAnnouncer = segmentAnnouncer;
return this;
}
public Builder serverAnnouncer(final DataSegmentServerAnnouncer serverAnnouncer)
{
this.serverAnnouncer = serverAnnouncer;
return this;
}
public Builder handoffNotifierFactory(final SegmentHandoffNotifierFactory handoffNotifierFactory)
{
this.handoffNotifierFactory = handoffNotifierFactory;
return this;
}
public Builder queryRunnerFactoryConglomerateProvider(final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider)
{
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
return this;
}
public Builder queryProcessingPool(final QueryProcessingPool queryProcessingPool)
{
this.queryProcessingPool = queryProcessingPool;
return this;
}
public Builder joinableFactory(final JoinableFactory joinableFactory)
{
this.joinableFactory = joinableFactory;
return this;
}
public Builder monitorSchedulerProvider(final Provider<MonitorScheduler> monitorSchedulerProvider)
{
this.monitorSchedulerProvider = monitorSchedulerProvider;
return this;
}
public Builder segmentCacheManager(final SegmentCacheManager segmentCacheManager)
{
this.segmentCacheManager = segmentCacheManager;
return this;
}
public Builder jsonMapper(final ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
return this;
}
public Builder taskWorkDir(final File taskWorkDir)
{
this.taskWorkDir = taskWorkDir;
return this;
}
public Builder indexIO(final IndexIO indexIO)
{
this.indexIO = indexIO;
return this;
}
public Builder cache(final Cache cache)
{
this.cache = cache;
return this;
}
public Builder cacheConfig(final CacheConfig cacheConfig)
{
this.cacheConfig = cacheConfig;
return this;
}
public Builder cachePopulatorStats(final CachePopulatorStats cachePopulatorStats)
{
this.cachePopulatorStats = cachePopulatorStats;
return this;
}
public Builder indexMergerV9(final IndexMergerV9 indexMergerV9)
{
this.indexMergerV9 = indexMergerV9;
return this;
}
public Builder druidNodeAnnouncer(final DruidNodeAnnouncer druidNodeAnnouncer)
{
this.druidNodeAnnouncer = druidNodeAnnouncer;
return this;
}
public Builder druidNode(final DruidNode druidNode)
{
this.druidNode = druidNode;
return this;
}
public Builder lookupNodeService(final LookupNodeService lookupNodeService)
{
this.lookupNodeService = lookupNodeService;
return this;
}
public Builder dataNodeService(final DataNodeService dataNodeService)
{
this.dataNodeService = dataNodeService;
return this;
}
public Builder taskReportFileWriter(final TaskReportFileWriter taskReportFileWriter)
{
this.taskReportFileWriter = taskReportFileWriter;
return this;
}
public Builder authorizerMapper(final AuthorizerMapper authorizerMapper)
{
this.authorizerMapper = authorizerMapper;
return this;
}
public Builder chatHandlerProvider(final ChatHandlerProvider chatHandlerProvider)
{
this.chatHandlerProvider = chatHandlerProvider;
return this;
}
public Builder rowIngestionMetersFactory(final RowIngestionMetersFactory rowIngestionMetersFactory)
{
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
return this;
}
public Builder appenderatorsManager(final AppenderatorsManager appenderatorsManager)
{
this.appenderatorsManager = appenderatorsManager;
return this;
}
public Builder indexingServiceClient(final IndexingServiceClient indexingServiceClient)
{
this.indexingServiceClient = indexingServiceClient;
return this;
}
public Builder coordinatorClient(final CoordinatorClient coordinatorClient)
{
this.coordinatorClient = coordinatorClient;
return this;
}
public Builder intermediaryDataManager(final IntermediaryDataManager intermediaryDataManager)
{
this.intermediaryDataManager = intermediaryDataManager;
return this;
}
public Builder supervisorTaskClientFactory(final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> supervisorTaskClientFactory)
{
this.supervisorTaskClientFactory = supervisorTaskClientFactory;
return this;
}
public Builder shuffleClient(final ShuffleClient shuffleClient)
{
this.shuffleClient = shuffleClient;
return this;
}
public TaskToolbox build()
{
return new TaskToolbox(
config,
taskExecutorNode,
taskActionClient,
emitter,
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
queryProcessingPool,
joinableFactory,
monitorSchedulerProvider,
segmentCacheManager,
jsonMapper,
taskWorkDir,
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
indexMergerV9,
druidNodeAnnouncer,
druidNode,
lookupNodeService,
dataNodeService,
taskReportFileWriter,
intermediaryDataManager,
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory,
appenderatorsManager,
indexingServiceClient,
coordinatorClient,
supervisorTaskClientFactory,
shuffleClient
);
}
}
}

View File

@ -189,44 +189,48 @@ public class TaskToolboxFactory
public TaskToolbox build(Task task)
{
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox(
config,
taskExecutorNode,
taskActionClientFactory.create(task),
emitter,
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
queryProcessingPool,
joinableFactory,
monitorSchedulerProvider,
segmentCacheManagerFactory.manufacturate(taskWorkDir),
jsonMapper,
taskWorkDir,
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
indexMergerV9Factory.create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns())),
druidNodeAnnouncer,
druidNode,
lookupNodeService,
dataNodeService,
taskReportFileWriter,
intermediaryDataManager,
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory,
appenderatorsManager,
indexingServiceClient,
coordinatorClient,
supervisorTaskClientFactory,
shuffleClient
);
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(taskExecutorNode)
.taskActionClient(taskActionClientFactory.create(task))
.emitter(emitter)
.segmentPusher(segmentPusher)
.dataSegmentKiller(dataSegmentKiller)
.dataSegmentMover(dataSegmentMover)
.dataSegmentArchiver(dataSegmentArchiver)
.segmentAnnouncer(segmentAnnouncer)
.serverAnnouncer(serverAnnouncer)
.handoffNotifierFactory(handoffNotifierFactory)
.queryRunnerFactoryConglomerateProvider(queryRunnerFactoryConglomerateProvider)
.queryProcessingPool(queryProcessingPool)
.joinableFactory(joinableFactory)
.monitorSchedulerProvider(monitorSchedulerProvider)
.segmentCacheManager(segmentCacheManagerFactory.manufacturate(taskWorkDir))
.jsonMapper(jsonMapper)
.taskWorkDir(taskWorkDir)
.indexIO(indexIO)
.cache(cache)
.cacheConfig(cacheConfig)
.cachePopulatorStats(cachePopulatorStats)
.indexMergerV9(
indexMergerV9Factory.create(
task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns())
)
)
.druidNodeAnnouncer(druidNodeAnnouncer)
.druidNode(druidNode)
.lookupNodeService(lookupNodeService)
.dataNodeService(dataNodeService)
.taskReportFileWriter(taskReportFileWriter)
.intermediaryDataManager(intermediaryDataManager)
.authorizerMapper(authorizerMapper)
.chatHandlerProvider(chatHandlerProvider)
.rowIngestionMetersFactory(rowIngestionMetersFactory)
.appenderatorsManager(appenderatorsManager)
.indexingServiceClient(indexingServiceClient)
.coordinatorClient(coordinatorClient)
.supervisorTaskClientFactory(supervisorTaskClientFactory)
.shuffleClient(shuffleClient)
.build();
}
}

View File

@ -243,7 +243,7 @@ public class BatchAppenderatorsTest
"foo",
new TestAppenderatorsManager(),
metrics,
new TestTaskToolbox(
makeTaskToolbox(
objectMapper,
indexMerger,
TaskConfig.BatchProcessingMode.OPEN_SEGMENTS
@ -266,7 +266,7 @@ public class BatchAppenderatorsTest
"foo",
new TestAppenderatorsManager(),
metrics,
new TestTaskToolbox(
makeTaskToolbox(
objectMapper,
indexMerger,
TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS
@ -290,7 +290,7 @@ public class BatchAppenderatorsTest
"foo",
new TestAppenderatorsManager(),
metrics,
new TestTaskToolbox(
makeTaskToolbox(
objectMapper,
indexMerger,
TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS
@ -560,71 +560,39 @@ public class BatchAppenderatorsTest
}
}
private static class TestTaskToolbox extends TaskToolbox
private static TaskToolbox makeTaskToolbox(
ObjectMapper mapper,
IndexMergerV9 indexMergerV9,
TaskConfig.BatchProcessingMode mode
)
{
private final Map<DataSegment, File> segmentFileMap;
TestTaskToolbox(ObjectMapper mapper, IndexMergerV9 indexMergerV9, TaskConfig.BatchProcessingMode mode)
{
super(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
mode.name(),
null
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
mapper,
null,
new IndexIO(
new ObjectMapper(),
() -> 0
),
null,
null,
null,
indexMergerV9,
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
null,
new TestAppenderatorsManager(),
null,
null,
null,
null
);
this.segmentFileMap = null;
}
return new TaskToolbox.Builder()
.config(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
mode.name(),
null
)
)
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(mapper)
.indexIO(new IndexIO(new ObjectMapper(), () -> 0))
.indexMergerV9(indexMergerV9)
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.appenderatorsManager(new TestAppenderatorsManager())
.build();
}
}
}

View File

@ -973,7 +973,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// maxRowsPerSegment is set to 2 inside the runIndexTask methods
Pair<TaskStatus, List<DataSegment>> result = runIndexTask();
Assert.assertEquals(6, result.rhs.size());
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
@ -1097,7 +1097,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Intervals.of("2014-01-01T02:00:00.000Z/2014-01-01T03:00:00.000Z"),
realSegmentsAfterFullCompaction.get(2).getInterval()
);
}
@Test
@ -1596,7 +1596,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException
{
final SegmentCacheManager loader = new SegmentLocalCacheManager(
new SegmentLoaderConfig() {
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
@ -1606,59 +1607,41 @@ public class CompactionTaskRunTest extends IngestionTestBase
objectMapper
);
return new TaskToolbox(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
),
null,
createActionClient(task),
null,
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()),
new NoopDataSegmentKiller(),
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
loader,
objectMapper,
temporaryFolder.newFolder(),
getIndexIO(),
null,
null,
null,
getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)),
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
testUtils.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
indexingServiceClient,
coordinatorClient,
null,
null
);
return new TaskToolbox.Builder()
.config(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
)
)
.taskActionClient(createActionClient(task))
.segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()))
.dataSegmentKiller(new NoopDataSegmentKiller())
.joinableFactory(NoopJoinableFactory.INSTANCE)
.segmentCacheManager(loader)
.jsonMapper(objectMapper)
.taskWorkDir(temporaryFolder.newFolder())
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory())
.appenderatorsManager(new TestAppenderatorsManager())
.indexingServiceClient(indexingServiceClient)
.coordinatorClient(coordinatorClient)
.build();
}
private List<String> getCSVFormatRowsFromSegments(List<DataSegment> segments) throws Exception

View File

@ -118,6 +118,7 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -151,6 +152,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -281,7 +283,8 @@ public class CompactionTaskTest
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory());
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper));
binder.bind(SegmentCacheManagerFactory.class)
.toInstance(new SegmentCacheManagerFactory(objectMapper));
binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager());
binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT);
}
@ -360,7 +363,7 @@ public class CompactionTaskTest
public void setup()
{
final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
toolbox = new TestTaskToolbox(
toolbox = makeTaskToolbox(
new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
SEGMENT_MAP
@ -428,7 +431,8 @@ public class CompactionTaskTest
@Test
public void testCreateCompactionTaskWithTransformSpec()
{
ClientCompactionTaskTransformSpec transformSpec = new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null));
ClientCompactionTaskTransformSpec transformSpec =
new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null));
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
@ -447,7 +451,7 @@ public class CompactionTaskTest
@Test
public void testCreateCompactionTaskWithMetricsSpec()
{
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")};
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new CountAggregatorFactory("cnt")};
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
@ -1942,82 +1946,87 @@ public class CompactionTaskTest
}
}
private static class TestTaskToolbox extends TaskToolbox
private static TaskToolbox makeTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
Map<DataSegment, File> segments
)
{
private final Map<DataSegment, File> segmentFileMap;
TestTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
Map<DataSegment, File> segmentFileMap
)
final SegmentCacheManager segmentCacheManager = new SegmentCacheManager()
{
super(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
),
null,
taskActionClient,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
null,
null,
indexIO,
null,
null,
null,
new IndexMergerV9(OBJECT_MAPPER, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance(), true),
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
TEST_UTILS.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
INDEXING_SERVICE_CLIENT,
COORDINATOR_CLIENT,
null,
null
);
this.segmentFileMap = segmentFileMap;
}
@Override
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
{
final Map<DataSegment, File> submap = Maps.newHashMapWithExpectedSize(segments.size());
for (DataSegment segment : segments) {
final File file = Preconditions.checkNotNull(segmentFileMap.get(segment));
submap.put(segment, file);
@Override
public boolean isSegmentCached(DataSegment segment)
{
throw new UnsupportedOperationException();
}
return submap;
}
@Override
public File getSegmentFiles(DataSegment segment)
{
return Preconditions.checkNotNull(segments.get(segment));
}
@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public void cleanup(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
{
throw new UnsupportedOperationException();
}
};
return new TaskToolbox.Builder()
.config(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
)
)
.taskActionClient(taskActionClient)
.joinableFactory(NoopJoinableFactory.INSTANCE)
.indexIO(indexIO)
.indexMergerV9(new IndexMergerV9(
OBJECT_MAPPER,
indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true
))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(TEST_UTILS.getRowIngestionMetersFactory())
.appenderatorsManager(new TestAppenderatorsManager())
.indexingServiceClient(INDEXING_SERVICE_CLIENT)
.coordinatorClient(COORDINATOR_CLIENT)
.segmentCacheManager(segmentCacheManager)
.build();
}
private static class TestTaskActionClient implements TaskActionClient

View File

@ -314,59 +314,41 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
StringUtils.format("ingestionTestBase-%s.json", System.currentTimeMillis())
);
final TaskToolbox box = new TaskToolbox(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClient,
null,
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()),
new NoopDataSegmentKiller(),
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
objectMapper,
temporaryFolder.newFolder(),
getIndexIO(),
null,
null,
null,
testUtils.getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)),
null,
null,
null,
null,
new SingleFileTaskReportFileWriter(taskReportsFile),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
testUtils.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
new NoopIndexingServiceClient(),
null,
null,
null
);
final TaskToolbox box = new TaskToolbox.Builder()
.config(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
)
)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
.taskActionClient(taskActionClient)
.segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()))
.dataSegmentKiller(new NoopDataSegmentKiller())
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(temporaryFolder.newFolder())
.indexIO(getIndexIO())
.indexMergerV9(testUtils.getIndexMergerV9Factory()
.create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.taskReportFileWriter(new SingleFileTaskReportFileWriter(taskReportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory())
.appenderatorsManager(new TestAppenderatorsManager())
.indexingServiceClient(new NoopIndexingServiceClient())
.build();
if (task.isReady(box.getTaskActionClient())) {
return Futures.immediateFuture(task.run(box));

View File

@ -689,68 +689,56 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
return new TaskToolbox(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
actionClient,
null,
new LocalDataSegmentPusher(
new LocalDataSegmentPusherConfig()
{
@Override
public File getStorageDirectory()
{
return localDeepStorage;
}
}
),
new NoopDataSegmentKiller(),
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
newSegmentLoader(temporaryFolder.newFolder()),
objectMapper,
temporaryFolder.newFolder(task.getId()),
getIndexIO(),
null,
null,
null,
getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)),
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
intermediaryDataManager,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
new TestUtils().getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
indexingServiceClient,
coordinatorClient,
new LocalParallelIndexTaskClientFactory(taskRunner, transientApiCallFailureRate),
new LocalShuffleClient(intermediaryDataManager)
);
return new TaskToolbox.Builder()
.config(
new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
)
)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
.taskActionClient(actionClient)
.segmentPusher(
new LocalDataSegmentPusher(
new LocalDataSegmentPusherConfig()
{
@Override
public File getStorageDirectory()
{
return localDeepStorage;
}
}
)
)
.dataSegmentKiller(new NoopDataSegmentKiller())
.joinableFactory(NoopJoinableFactory.INSTANCE)
.segmentCacheManager(newSegmentLoader(temporaryFolder.newFolder()))
.jsonMapper(objectMapper)
.taskWorkDir(temporaryFolder.newFolder(task.getId()))
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.intermediaryDataManager(intermediaryDataManager)
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(new TestUtils().getRowIngestionMetersFactory())
.appenderatorsManager(new TestAppenderatorsManager())
.indexingServiceClient(indexingServiceClient)
.coordinatorClient(coordinatorClient)
.supervisorTaskClientFactory(new LocalParallelIndexTaskClientFactory(taskRunner, transientApiCallFailureRate))
.shuffleClient(new LocalShuffleClient(intermediaryDataManager))
.build();
}
static class TestParallelIndexSupervisorTask extends ParallelIndexSupervisorTask