Use durable super sorter intermediate storage only with composable storage (#13748)

* This enables usage of durable storage connector only in case the composable storage feature is enabled.
This commit is contained in:
Rohan Garg 2023-02-06 18:59:18 +05:30 committed by GitHub
parent e16639121f
commit c5835c29a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -691,35 +691,22 @@ public class WorkerImpl implements Worker
final FileOutputChannelFactory fileOutputChannelFactory = final FileOutputChannelFactory fileOutputChannelFactory =
new FileOutputChannelFactory(fileChannelDirectory, frameSize, intermediateSuperSorterLocalStorageTracker); new FileOutputChannelFactory(fileChannelDirectory, frameSize, intermediateSuperSorterLocalStorageTracker);
if (MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(QueryContext.of(task.getContext()))) { if (MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(QueryContext.of(task.getContext())) &&
if (durableStageStorageEnabled) { durableStageStorageEnabled) {
return new ComposingOutputChannelFactory( return new ComposingOutputChannelFactory(
ImmutableList.of( ImmutableList.of(
fileOutputChannelFactory, fileOutputChannelFactory,
DurableStorageOutputChannelFactory.createStandardImplementation( DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(), task.getControllerTaskId(),
task().getWorkerNumber(), task().getWorkerNumber(),
stageNumber, stageNumber,
task().getId(), task().getId(),
frameSize, frameSize,
MSQTasks.makeStorageConnector(context.injector()), MSQTasks.makeStorageConnector(context.injector()),
tmpDir tmpDir
) )
), ),
frameSize frameSize
);
} else {
return fileOutputChannelFactory;
}
} else if (durableStageStorageEnabled) {
return DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
task().getWorkerNumber(),
stageNumber,
task().getId(),
frameSize,
MSQTasks.makeStorageConnector(context.injector()),
tmpDir
); );
} else { } else {
return fileOutputChannelFactory; return fileOutputChannelFactory;