Correct task status returned by controller (#13288)

* Correct worker status returned by controller

* Address review comments
This commit is contained in:
Adarsh Sanjeev 2022-10-31 15:18:19 +05:30 committed by GitHub
parent e1ff3ca289
commit 675fd982fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 6 deletions

View File

@ -90,6 +90,7 @@ import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessorFactory; import org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.TaskReportMSQDestination; import org.apache.druid.msq.indexing.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.WorkerCount;
import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker; import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
@ -1817,9 +1818,9 @@ public class ControllerImpl implements Controller
int runningTasks = 1; int runningTasks = 1;
if (taskLauncher != null) { if (taskLauncher != null) {
Pair<Integer, Integer> workerTaskStatus = taskLauncher.getWorkerTaskStatus(); WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
pendingTasks = workerTaskStatus.lhs; pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskStatus.rhs + 1; // To account for controller. runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
} }
return new MSQStatusReport( return new MSQStatusReport(
taskState, taskState,

View File

@ -30,7 +30,6 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -350,12 +349,12 @@ public class MSQWorkerTaskLauncher
* Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are * Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are
* not yet fully started as left and right respectively. * not yet fully started as left and right respectively.
*/ */
public Pair<Integer, Integer> getWorkerTaskStatus() public WorkerCount getWorkerTaskCount()
{ {
synchronized (taskIds) { synchronized (taskIds) {
int runningTasks = fullyStartedTasks.size(); int runningTasks = fullyStartedTasks.size();
int pendingTasks = desiredTaskCount - runningTasks; int pendingTasks = desiredTaskCount - runningTasks;
return Pair.of(runningTasks, pendingTasks); return new WorkerCount(runningTasks, pendingTasks);
} }
} }

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing;
/**
* Information about current status of worker tasks
*/
public class WorkerCount
{
private final int runningWorkerCount;
private final int pendingWorkerCount;
public WorkerCount(int runningWorkerCount, int pendingWorkerCount)
{
this.runningWorkerCount = runningWorkerCount;
this.pendingWorkerCount = pendingWorkerCount;
}
public int getRunningWorkerCount()
{
return runningWorkerCount;
}
public int getPendingWorkerCount()
{
return pendingWorkerCount;
}
}