Use controller id while reading from durable storage (#16943)

This commit is contained in:
Adarsh Sanjeev 2024-08-23 11:37:42 +05:30 committed by GitHub
parent e2516d9a67
commit 2abcb41559
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 5 additions and 1 deletions

View File

@ -110,6 +110,7 @@ import java.util.stream.Collectors;
*/
public class RunWorkOrder
{
private final String controllerTaskId;
private final WorkOrder workOrder;
private final InputChannelFactory inputChannelFactory;
private final CounterTracker counterTracker;
@ -138,6 +139,7 @@ public class RunWorkOrder
private ListenableFuture<OutputChannels> stageOutputChannelsFuture;
public RunWorkOrder(
final String controllerTaskId,
final WorkOrder workOrder,
final InputChannelFactory inputChannelFactory,
final CounterTracker counterTracker,
@ -150,6 +152,7 @@ public class RunWorkOrder
final boolean removeNullBytes
)
{
this.controllerTaskId = controllerTaskId;
this.workOrder = workOrder;
this.inputChannelFactory = inputChannelFactory;
this.counterTracker = counterTracker;
@ -565,7 +568,7 @@ public class RunWorkOrder
)
{
return DurableStorageOutputChannelFactory.createStandardImplementation(
workOrder.getQueryDefinition().getQueryId(),
controllerTaskId,
workOrder.getWorkerNumber(),
workOrder.getStageNumber(),
workerContext.workerId(),

View File

@ -408,6 +408,7 @@ public class WorkerImpl implements Worker
final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty();
final RunWorkOrder runWorkOrder = new RunWorkOrder(
task.getControllerTaskId(),
workOrder,
inputChannelFactory,
stageCounters.computeIfAbsent(