mirror of https://github.com/apache/druid.git
Web console: Don't force waitUntilSegmentLoad to true (#15781)
* Don't force setting waitUntilSegmentsLoad * delete irrelevant code
This commit is contained in:
parent
37d1650ccf
commit
0089f6b905
|
@ -77,7 +77,7 @@ export type ExecutionDestination =
|
|||
numTotalRows?: number;
|
||||
}
|
||||
| { type: 'durableStorage'; numTotalRows?: number }
|
||||
| { type: 'dataSource'; dataSource: string; numTotalRows?: number; loaded?: boolean };
|
||||
| { type: 'dataSource'; dataSource: string; numTotalRows?: number };
|
||||
|
||||
export interface ExecutionDestinationPage {
|
||||
id: number;
|
||||
|
@ -515,19 +515,6 @@ export class Execution {
|
|||
return new Execution(value);
|
||||
}
|
||||
|
||||
public markDestinationDatasourceLoaded(): Execution {
|
||||
const { destination } = this;
|
||||
if (destination?.type !== 'dataSource') return this;
|
||||
|
||||
return new Execution({
|
||||
...this.valueOf(),
|
||||
destination: {
|
||||
...destination,
|
||||
loaded: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public isProcessingData(): boolean {
|
||||
const { status, stages } = this;
|
||||
return Boolean(
|
||||
|
@ -549,11 +536,11 @@ export class Execution {
|
|||
|
||||
switch (segmentStatus?.state) {
|
||||
case 'INIT':
|
||||
label = 'Waiting for segments loading to start...';
|
||||
label = 'Waiting for segment loading to start...';
|
||||
break;
|
||||
|
||||
case 'WAITING':
|
||||
label = 'Waiting for segments loading to complete...';
|
||||
label = 'Waiting for segment loading to complete...';
|
||||
break;
|
||||
|
||||
case 'SUCCESS':
|
||||
|
@ -570,17 +557,6 @@ export class Execution {
|
|||
};
|
||||
}
|
||||
|
||||
public isFullyComplete(): boolean {
|
||||
if (this.isWaitingForQuery()) return false;
|
||||
|
||||
const { status, destination } = this;
|
||||
if (status === 'SUCCESS' && destination?.type === 'dataSource') {
|
||||
return Boolean(destination.loaded);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public getIngestDatasource(): string | undefined {
|
||||
const { destination } = this;
|
||||
if (destination?.type !== 'dataSource') return;
|
||||
|
@ -592,9 +568,7 @@ export class Execution {
|
|||
}
|
||||
|
||||
public isSuccessfulInsert(): boolean {
|
||||
return Boolean(
|
||||
this.isFullyComplete() && this.getIngestDatasource() && this.status === 'SUCCESS',
|
||||
);
|
||||
return Boolean(this.status === 'SUCCESS' && this.getIngestDatasource());
|
||||
}
|
||||
|
||||
public getErrorMessage(): string | undefined {
|
||||
|
|
|
@ -550,10 +550,13 @@ export class WorkbenchQuery {
|
|||
|
||||
if (engine === 'sql-msq-task') {
|
||||
apiQuery.context.executionMode ??= 'async';
|
||||
apiQuery.context.finalizeAggregations ??= !ingestQuery;
|
||||
apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery;
|
||||
if (ingestQuery) {
|
||||
// Alter these defaults for ingest queries if unset
|
||||
apiQuery.context.finalizeAggregations ??= false;
|
||||
apiQuery.context.groupByEnableMultiValueUnnesting ??= false;
|
||||
apiQuery.context.waitUntilSegmentsLoad ??= true;
|
||||
}
|
||||
}
|
||||
|
||||
if (Array.isArray(queryParameters) && queryParameters.length) {
|
||||
apiQuery.parameters = queryParameters;
|
||||
|
|
|
@ -22,10 +22,7 @@ import type { CancelToken } from 'axios';
|
|||
import type { Execution } from '../../druid-models';
|
||||
import { IntermediateQueryState } from '../../utils';
|
||||
|
||||
import {
|
||||
updateExecutionWithDatasourceLoadedIfNeeded,
|
||||
updateExecutionWithTaskIfNeeded,
|
||||
} from './sql-task-execution';
|
||||
import { updateExecutionWithTaskIfNeeded } from './sql-task-execution';
|
||||
|
||||
export function extractResult(
|
||||
execution: Execution | IntermediateQueryState<Execution>,
|
||||
|
@ -49,14 +46,13 @@ export async function executionBackgroundStatusCheck(
|
|||
switch (execution.engine) {
|
||||
case 'sql-msq-task':
|
||||
execution = await updateExecutionWithTaskIfNeeded(execution, cancelToken);
|
||||
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Error(`can not background check execution for engine ${execution.engine}`);
|
||||
}
|
||||
|
||||
if (!execution.isFullyComplete()) return new IntermediateQueryState(execution);
|
||||
if (execution.isWaitingForQuery()) return new IntermediateQueryState(execution);
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
|
|
@ -16,25 +16,17 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { L, QueryResult } from '@druid-toolkit/query';
|
||||
import { QueryResult } from '@druid-toolkit/query';
|
||||
import type { AxiosResponse, CancelToken } from 'axios';
|
||||
|
||||
import type { AsyncStatusResponse, MsqTaskPayloadResponse, QueryContext } from '../../druid-models';
|
||||
import { Execution } from '../../druid-models';
|
||||
import { Api } from '../../singletons';
|
||||
import {
|
||||
deepGet,
|
||||
DruidError,
|
||||
IntermediateQueryState,
|
||||
queryDruidSql,
|
||||
QueryManager,
|
||||
} from '../../utils';
|
||||
import { deepGet, DruidError, IntermediateQueryState, QueryManager } from '../../utils';
|
||||
import { maybeGetClusterCapacity } from '../capacity';
|
||||
|
||||
const USE_TASK_PAYLOAD = true;
|
||||
const USE_TASK_REPORTS = true;
|
||||
const WAIT_FOR_SEGMENT_METADATA_TIMEOUT = 180000; // 3 minutes to wait until segments appear in the metadata
|
||||
const WAIT_FOR_SEGMENT_LOAD_TIMEOUT = 540000; // 9 minutes to wait for segments to load at all
|
||||
|
||||
// some executionMode has to be set on the /druid/v2/sql/statements API
|
||||
function ensureExecutionModeIsSet(context: QueryContext | undefined): QueryContext {
|
||||
|
@ -57,13 +49,7 @@ export interface SubmitTaskQueryOptions {
|
|||
export async function submitTaskQuery(
|
||||
options: SubmitTaskQueryOptions,
|
||||
): Promise<Execution | IntermediateQueryState<Execution>> {
|
||||
const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;
|
||||
|
||||
// setting waitUntilSegmentsLoad to true by default
|
||||
const context = {
|
||||
waitUntilSegmentsLoad: true,
|
||||
...(options.context || {}),
|
||||
};
|
||||
const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;
|
||||
|
||||
let sqlQuery: string;
|
||||
let jsonQuery: Record<string, any>;
|
||||
|
@ -114,15 +100,13 @@ export async function submitTaskQuery(
|
|||
);
|
||||
}
|
||||
|
||||
let execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context);
|
||||
const execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context);
|
||||
|
||||
if (onSubmitted) {
|
||||
onSubmitted(execution.id);
|
||||
}
|
||||
|
||||
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
|
||||
|
||||
if (execution.isFullyComplete()) return execution;
|
||||
if (!execution.isWaitingForQuery()) return execution;
|
||||
|
||||
if (cancelToken) {
|
||||
cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination));
|
||||
|
@ -145,12 +129,11 @@ export async function reattachTaskExecution(
|
|||
|
||||
try {
|
||||
execution = await getTaskExecution(id, undefined, cancelToken);
|
||||
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
|
||||
} catch (e) {
|
||||
throw new Error(`Reattaching to query failed due to: ${e.message}`);
|
||||
}
|
||||
|
||||
if (execution.isFullyComplete()) return execution;
|
||||
if (!execution.isWaitingForQuery()) return execution;
|
||||
|
||||
if (cancelToken) {
|
||||
cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination));
|
||||
|
@ -256,58 +239,6 @@ export async function getTaskExecution(
|
|||
return execution;
|
||||
}
|
||||
|
||||
export async function updateExecutionWithDatasourceLoadedIfNeeded(
|
||||
execution: Execution,
|
||||
_cancelToken?: CancelToken,
|
||||
): Promise<Execution> {
|
||||
if (
|
||||
!(execution.destination?.type === 'dataSource' && !execution.destination.loaded) ||
|
||||
execution.status !== 'SUCCESS'
|
||||
) {
|
||||
return execution;
|
||||
}
|
||||
|
||||
// This means we don't have to perform the SQL query to check if the segments are loaded
|
||||
if (execution.queryContext?.waitUntilSegmentsLoad === true) {
|
||||
return execution.markDestinationDatasourceLoaded();
|
||||
}
|
||||
|
||||
const endTime = execution.getEndTime();
|
||||
if (
|
||||
!endTime || // If endTime is not set (this is not expected to happen) then just bow out
|
||||
execution.stages?.getLastStage()?.partitionCount === 0 || // No data was meant to be written anyway, nothing to do
|
||||
endTime.valueOf() + WAIT_FOR_SEGMENT_LOAD_TIMEOUT < Date.now() // Enough time has passed since the query ran... don't bother waiting for segments to load.
|
||||
) {
|
||||
return execution.markDestinationDatasourceLoaded();
|
||||
}
|
||||
|
||||
const segmentCheck = await queryDruidSql({
|
||||
query: `SELECT
|
||||
COUNT(*) AS num_segments,
|
||||
COUNT(*) FILTER (WHERE is_published = 1 AND is_available = 0 AND replication_factor <> 0) AS loading_segments
|
||||
FROM sys.segments
|
||||
WHERE datasource = ${L(execution.destination.dataSource)} AND is_overshadowed = 0`,
|
||||
});
|
||||
|
||||
const numSegments: number = deepGet(segmentCheck, '0.num_segments') || 0;
|
||||
const loadingSegments: number = deepGet(segmentCheck, '0.loading_segments') || 0;
|
||||
|
||||
// There appear to be no segments, since we checked above that something was written out we know that they have not shown up in the metadata yet
|
||||
if (numSegments === 0) {
|
||||
if (endTime.valueOf() + WAIT_FOR_SEGMENT_METADATA_TIMEOUT < Date.now()) {
|
||||
// Enough time has passed since the query ran... give up waiting for segments to show up in metadata.
|
||||
return execution.markDestinationDatasourceLoaded();
|
||||
}
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
// There are segments, and we are still waiting for some of them to load
|
||||
if (loadingSegments > 0) return execution;
|
||||
|
||||
return execution.markDestinationDatasourceLoaded();
|
||||
}
|
||||
|
||||
function cancelTaskExecutionOnCancel(
|
||||
id: string,
|
||||
cancelToken: CancelToken,
|
||||
|
|
|
@ -330,7 +330,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
|
|||
icon={IconNames.STOPWATCH}
|
||||
text="Wait until segments have loaded"
|
||||
value={waitUntilSegmentsLoad}
|
||||
undefinedEffectiveValue /* ={true} */
|
||||
undefinedEffectiveValue={ingestMode}
|
||||
onValueChange={v =>
|
||||
changeQueryContext(changeWaitUntilSegmentsLoad(queryContext, v))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue