Fix data race in getting results from MSQ select tasks. (#16107)

* Fix data race in getting results from MSQ select tasks.

* Add better logging

* Handling number overflow.
This commit is contained in:
Karan Kumar 2024-03-13 08:58:18 +05:30 committed by GitHub
parent 6f6f86c325
commit 84c5098473
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 25 deletions

View File

@ -462,9 +462,27 @@ public class ControllerImpl implements Controller
log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError)); log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError));
} }
} }
MSQResultsReport resultsReport = null;
if (queryKernel != null && queryKernel.isSuccess()) { if (queryKernel != null && queryKernel.isSuccess()) {
// If successful, encourage the tasks to exit successfully. // If successful, encourage the tasks to exit successfully.
// get results before posting finish to the tasks.
if (resultsYielder != null) {
resultsReport = makeResultsTaskReport(
queryDef,
resultsYielder,
task.getQuerySpec().getColumnMappings(),
task.getSqlTypeNames(),
MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
);
try {
resultsYielder.close();
}
catch (IOException e) {
throw new RuntimeException("Unable to fetch results of various worker tasks successfully", e);
}
} else {
resultsReport = null;
}
postFinishToAllTasks(); postFinishToAllTasks();
workerTaskLauncher.stop(false); workerTaskLauncher.stop(false);
} else { } else {
@ -509,7 +527,6 @@ public class ControllerImpl implements Controller
try { try {
// Write report even if something went wrong. // Write report even if something went wrong.
final MSQStagesReport stagesReport; final MSQStagesReport stagesReport;
final MSQResultsReport resultsReport;
if (queryDef != null) { if (queryDef != null) {
final Map<Integer, ControllerStagePhase> stagePhaseMap; final Map<Integer, ControllerStagePhase> stagePhaseMap;
@ -538,18 +555,6 @@ public class ControllerImpl implements Controller
stagesReport = null; stagesReport = null;
} }
if (resultsYielder != null) {
resultsReport = makeResultsTaskReport(
queryDef,
resultsYielder,
task.getQuerySpec().getColumnMappings(),
task.getSqlTypeNames(),
MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
);
} else {
resultsReport = null;
}
final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload( final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload(
makeStatusReport( makeStatusReport(
taskStateForReport, taskStateForReport,
@ -564,7 +569,6 @@ public class ControllerImpl implements Controller
countersSnapshot, countersSnapshot,
resultsReport resultsReport
); );
context.writeReports( context.writeReports(
id(), id(),
TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload)) TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload))

View File

@ -28,8 +28,10 @@ import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -39,6 +41,7 @@ import java.util.Objects;
public class MSQResultsReport public class MSQResultsReport
{ {
private static final Logger log = new Logger(MSQResultsReport.class);
/** /**
* Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility * Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility
* with SQL (which also allows duplicate column names in query results). * with SQL (which also allows duplicate column names in query results).
@ -83,18 +86,35 @@ public class MSQResultsReport
MSQSelectDestination selectDestination MSQSelectDestination selectDestination
) )
{ {
if (selectDestination.shouldTruncateResultsInTaskReport()) { List<Object[]> results = new ArrayList<>();
List<Object[]> results = new ArrayList<>(); long rowCount = 0;
int rowCount = 0; int factor = 1;
while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) { while (!resultYielder.isDone()) {
results.add(resultYielder.get()); results.add(resultYielder.get());
resultYielder = resultYielder.next(null); resultYielder = resultYielder.next(null);
++rowCount; ++rowCount;
if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) {
break;
}
if (rowCount % (factor * Limits.MAX_SELECT_RESULT_ROWS) == 0) {
log.warn(
"Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. "
+ "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. "
+ "If you require all the results, consider setting [%s=%s] in the query context which will allow you to fetch large result sets.",
rowCount,
Limits.MAX_SELECT_RESULT_ROWS,
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLESTORAGE.getName()
);
factor = factor < 32 ? factor * 2 : 32;
} }
return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), !resultYielder.isDone());
} else {
return new MSQResultsReport(signature, sqlTypeNames, resultYielder, false);
} }
return new MSQResultsReport(
signature,
sqlTypeNames,
Yielders.each(Sequences.simple(results)),
!resultYielder.isDone()
);
} }
@JsonProperty("signature") @JsonProperty("signature")