diff --git a/web-console/src/druid-models/execution/execution.ts b/web-console/src/druid-models/execution/execution.ts index 4e48528855b..3059a401c4f 100644 --- a/web-console/src/druid-models/execution/execution.ts +++ b/web-console/src/druid-models/execution/execution.ts @@ -77,7 +77,7 @@ export type ExecutionDestination = numTotalRows?: number; } | { type: 'durableStorage'; numTotalRows?: number } - | { type: 'dataSource'; dataSource: string; numTotalRows?: number; loaded?: boolean }; + | { type: 'dataSource'; dataSource: string; numTotalRows?: number }; export interface ExecutionDestinationPage { id: number; @@ -515,19 +515,6 @@ export class Execution { return new Execution(value); } - public markDestinationDatasourceLoaded(): Execution { - const { destination } = this; - if (destination?.type !== 'dataSource') return this; - - return new Execution({ - ...this.valueOf(), - destination: { - ...destination, - loaded: true, - }, - }); - } - public isProcessingData(): boolean { const { status, stages } = this; return Boolean( @@ -549,11 +536,11 @@ export class Execution { switch (segmentStatus?.state) { case 'INIT': - label = 'Waiting for segments loading to start...'; + label = 'Waiting for segment loading to start...'; break; case 'WAITING': - label = 'Waiting for segments loading to complete...'; + label = 'Waiting for segment loading to complete...'; break; case 'SUCCESS': @@ -570,17 +557,6 @@ export class Execution { }; } - public isFullyComplete(): boolean { - if (this.isWaitingForQuery()) return false; - - const { status, destination } = this; - if (status === 'SUCCESS' && destination?.type === 'dataSource') { - return Boolean(destination.loaded); - } - - return true; - } - public getIngestDatasource(): string | undefined { const { destination } = this; if (destination?.type !== 'dataSource') return; @@ -592,9 +568,7 @@ export class Execution { } public isSuccessfulInsert(): boolean { - return Boolean( - this.isFullyComplete() && this.getIngestDatasource() && this.status === 'SUCCESS', - ); + return Boolean(this.status === 'SUCCESS' && this.getIngestDatasource()); } public getErrorMessage(): string | undefined { diff --git a/web-console/src/druid-models/workbench-query/workbench-query.ts b/web-console/src/druid-models/workbench-query/workbench-query.ts index 0c7775e515f..c97c478379e 100644 --- a/web-console/src/druid-models/workbench-query/workbench-query.ts +++ b/web-console/src/druid-models/workbench-query/workbench-query.ts @@ -550,9 +550,12 @@ export class WorkbenchQuery { if (engine === 'sql-msq-task') { apiQuery.context.executionMode ??= 'async'; - apiQuery.context.finalizeAggregations ??= !ingestQuery; - apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery; - apiQuery.context.waitUntilSegmentsLoad ??= true; + if (ingestQuery) { + // Alter these defaults for ingest queries if unset + apiQuery.context.finalizeAggregations ??= false; + apiQuery.context.groupByEnableMultiValueUnnesting ??= false; + apiQuery.context.waitUntilSegmentsLoad ??= true; + } } if (Array.isArray(queryParameters) && queryParameters.length) { diff --git a/web-console/src/helpers/execution/general.ts b/web-console/src/helpers/execution/general.ts index ba6c3a504e3..a90c02a6306 100644 --- a/web-console/src/helpers/execution/general.ts +++ b/web-console/src/helpers/execution/general.ts @@ -22,10 +22,7 @@ import type { CancelToken } from 'axios'; import type { Execution } from '../../druid-models'; import { IntermediateQueryState } from '../../utils'; -import { - updateExecutionWithDatasourceLoadedIfNeeded, - updateExecutionWithTaskIfNeeded, -} from './sql-task-execution'; +import { updateExecutionWithTaskIfNeeded } from './sql-task-execution'; export function extractResult( execution: Execution | IntermediateQueryState, @@ -49,14 +46,13 @@ export async function executionBackgroundStatusCheck( switch (execution.engine) { case 'sql-msq-task': execution = await updateExecutionWithTaskIfNeeded(execution, cancelToken); - execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken); break; default: throw new Error(`can not background check execution for engine ${execution.engine}`); } - if (!execution.isFullyComplete()) return new IntermediateQueryState(execution); + if (execution.isWaitingForQuery()) return new IntermediateQueryState(execution); return execution; } diff --git a/web-console/src/helpers/execution/sql-task-execution.ts b/web-console/src/helpers/execution/sql-task-execution.ts index f690886ad05..f0aa7dde54c 100644 --- a/web-console/src/helpers/execution/sql-task-execution.ts +++ b/web-console/src/helpers/execution/sql-task-execution.ts @@ -16,25 +16,17 @@ * limitations under the License. */ -import { L, QueryResult } from '@druid-toolkit/query'; +import { QueryResult } from '@druid-toolkit/query'; import type { AxiosResponse, CancelToken } from 'axios'; import type { AsyncStatusResponse, MsqTaskPayloadResponse, QueryContext } from '../../druid-models'; import { Execution } from '../../druid-models'; import { Api } from '../../singletons'; -import { - deepGet, - DruidError, - IntermediateQueryState, - queryDruidSql, - QueryManager, -} from '../../utils'; +import { deepGet, DruidError, IntermediateQueryState, QueryManager } from '../../utils'; import { maybeGetClusterCapacity } from '../capacity'; const USE_TASK_PAYLOAD = true; const USE_TASK_REPORTS = true; -const WAIT_FOR_SEGMENT_METADATA_TIMEOUT = 180000; // 3 minutes to wait until segments appear in the metadata -const WAIT_FOR_SEGMENT_LOAD_TIMEOUT = 540000; // 9 minutes to wait for segments to load at all // some executionMode has to be set on the /druid/v2/sql/statements API function ensureExecutionModeIsSet(context: QueryContext | undefined): QueryContext { @@ -57,13 +49,7 @@ export interface SubmitTaskQueryOptions { export async function submitTaskQuery( options: SubmitTaskQueryOptions, ): Promise> { - const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options; - - // setting waitUntilSegmentsLoad to true by default - const context = { - waitUntilSegmentsLoad: true, - ...(options.context || {}), - }; + const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options; let sqlQuery: string; let jsonQuery: Record; @@ -114,15 +100,13 @@ export async function submitTaskQuery( ); } - let execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context); + const execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context); if (onSubmitted) { onSubmitted(execution.id); } - execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken); - - if (execution.isFullyComplete()) return execution; + if (!execution.isWaitingForQuery()) return execution; if (cancelToken) { cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination)); @@ -145,12 +129,11 @@ export async function reattachTaskExecution( try { execution = await getTaskExecution(id, undefined, cancelToken); - execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken); } catch (e) { throw new Error(`Reattaching to query failed due to: ${e.message}`); } - if (execution.isFullyComplete()) return execution; + if (!execution.isWaitingForQuery()) return execution; if (cancelToken) { cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination)); @@ -256,58 +239,6 @@ export async function getTaskExecution( return execution; } -export async function updateExecutionWithDatasourceLoadedIfNeeded( - execution: Execution, - _cancelToken?: CancelToken, -): Promise { - if ( - !(execution.destination?.type === 'dataSource' && !execution.destination.loaded) || - execution.status !== 'SUCCESS' - ) { - return execution; - } - - // This means we don't have to perform the SQL query to check if the segments are loaded - if (execution.queryContext?.waitUntilSegmentsLoad === true) { - return execution.markDestinationDatasourceLoaded(); - } - - const endTime = execution.getEndTime(); - if ( - !endTime || // If endTime is not set (this is not expected to happen) then just bow out - execution.stages?.getLastStage()?.partitionCount === 0 || // No data was meant to be written anyway, nothing to do - endTime.valueOf() + WAIT_FOR_SEGMENT_LOAD_TIMEOUT < Date.now() // Enough time has passed since the query ran... don't bother waiting for segments to load. - ) { - return execution.markDestinationDatasourceLoaded(); - } - - const segmentCheck = await queryDruidSql({ - query: `SELECT - COUNT(*) AS num_segments, - COUNT(*) FILTER (WHERE is_published = 1 AND is_available = 0 AND replication_factor <> 0) AS loading_segments -FROM sys.segments -WHERE datasource = ${L(execution.destination.dataSource)} AND is_overshadowed = 0`, - }); - - const numSegments: number = deepGet(segmentCheck, '0.num_segments') || 0; - const loadingSegments: number = deepGet(segmentCheck, '0.loading_segments') || 0; - - // There appear to be no segments, since we checked above that something was written out we know that they have not shown up in the metadata yet - if (numSegments === 0) { - if (endTime.valueOf() + WAIT_FOR_SEGMENT_METADATA_TIMEOUT < Date.now()) { - // Enough time has passed since the query ran... give up waiting for segments to show up in metadata. - return execution.markDestinationDatasourceLoaded(); - } - - return execution; - } - - // There are segments, and we are still waiting for some of them to load - if (loadingSegments > 0) return execution; - - return execution.markDestinationDatasourceLoaded(); -} - function cancelTaskExecutionOnCancel( id: string, cancelToken: CancelToken, diff --git a/web-console/src/views/workbench-view/run-panel/run-panel.tsx b/web-console/src/views/workbench-view/run-panel/run-panel.tsx index 51e18ad4136..ac67c40ff27 100644 --- a/web-console/src/views/workbench-view/run-panel/run-panel.tsx +++ b/web-console/src/views/workbench-view/run-panel/run-panel.tsx @@ -330,7 +330,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) { icon={IconNames.STOPWATCH} text="Wait until segments have loaded" value={waitUntilSegmentsLoad} - undefinedEffectiveValue /* ={true} */ + undefinedEffectiveValue={ingestMode} onValueChange={v => changeQueryContext(changeWaitUntilSegmentsLoad(queryContext, v)) }