diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index d62bcce04dd..5da74f0a52a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -462,9 +462,27 @@ public class ControllerImpl implements Controller log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError)); } } - + MSQResultsReport resultsReport = null; if (queryKernel != null && queryKernel.isSuccess()) { // If successful, encourage the tasks to exit successfully. + // get results before posting finish to the tasks. + if (resultsYielder != null) { + resultsReport = makeResultsTaskReport( + queryDef, + resultsYielder, + task.getQuerySpec().getColumnMappings(), + task.getSqlTypeNames(), + MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context()) + ); + try { + resultsYielder.close(); + } + catch (IOException e) { + throw new RuntimeException("Unable to fetch results of various worker tasks successfully", e); + } + } else { + resultsReport = null; + } postFinishToAllTasks(); workerTaskLauncher.stop(false); } else { @@ -509,7 +527,6 @@ public class ControllerImpl implements Controller try { // Write report even if something went wrong. final MSQStagesReport stagesReport; - final MSQResultsReport resultsReport; if (queryDef != null) { final Map stagePhaseMap; @@ -538,18 +555,6 @@ public class ControllerImpl implements Controller stagesReport = null; } - if (resultsYielder != null) { - resultsReport = makeResultsTaskReport( - queryDef, - resultsYielder, - task.getQuerySpec().getColumnMappings(), - task.getSqlTypeNames(), - MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context()) - ); - } else { - resultsReport = null; - } - final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload( makeStatusReport( taskStateForReport, @@ -564,7 +569,6 @@ public class ControllerImpl implements Controller countersSnapshot, resultsReport ); - context.writeReports( id(), TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload)) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java index db46b420f00..b96ce469145 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java @@ -28,8 +28,10 @@ import org.apache.druid.common.config.Configs; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; @@ -39,6 +41,7 @@ import java.util.Objects; public class MSQResultsReport { + private static final Logger log = new Logger(MSQResultsReport.class); /** * Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility * with SQL (which also allows duplicate column names in query results). @@ -83,18 +86,35 @@ public class MSQResultsReport MSQSelectDestination selectDestination ) { - if (selectDestination.shouldTruncateResultsInTaskReport()) { - List results = new ArrayList<>(); - int rowCount = 0; - while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) { - results.add(resultYielder.get()); - resultYielder = resultYielder.next(null); - ++rowCount; + List results = new ArrayList<>(); + long rowCount = 0; + int factor = 1; + while (!resultYielder.isDone()) { + results.add(resultYielder.get()); + resultYielder = resultYielder.next(null); + ++rowCount; + if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) { + break; + } + if (rowCount % (factor * Limits.MAX_SELECT_RESULT_ROWS) == 0) { + log.warn( + "Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. " + + "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. " + + "If you require all the results, consider setting [%s=%s] in the query context which will allow you to fetch large result sets.", + rowCount, + Limits.MAX_SELECT_RESULT_ROWS, + MultiStageQueryContext.CTX_SELECT_DESTINATION, + MSQSelectDestination.DURABLESTORAGE.getName() + ); + factor = factor < 32 ? factor * 2 : 32; } - return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), !resultYielder.isDone()); - } else { - return new MSQResultsReport(signature, sqlTypeNames, resultYielder, false); } + return new MSQResultsReport( + signature, + sqlTypeNames, + Yielders.each(Sequences.simple(results)), + !resultYielder.isDone() + ); } @JsonProperty("signature")