Web console: Fix concurrent tasks (#15649)

* Improve handling of concurrent tasks option

* Update snapshots
This commit is contained in:
Vadim Ogievetsky 2024-01-09 16:09:42 -08:00 committed by GitHub
parent 747d973752
commit 85b8cf9f37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1400 additions and 1426 deletions

View File

@ -72,10 +72,6 @@ export interface Field<M> {
hide?: Functor<M, boolean>;
hideInMore?: Functor<M, boolean>;
valueAdjustment?: (value: any) => any;
/**
* An optional callback to transform the value before it is set on the input
*/
adjustValue?: (value: any) => any;
adjustment?: (model: Partial<M>, oldModel: Partial<M>) => Partial<M>;
issueWithValue?: (value: any) => string | undefined;
@ -382,14 +378,12 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
const disabled = AutoForm.evaluateFunctor(field.disabled, model, false);
const intent = required && modelValue == null ? AutoForm.REQUIRED_INTENT : undefined;
const adjustedValue = field.adjustValue ? field.adjustValue(shownValue) : shownValue;
return (
<ButtonGroup large={large}>
<Button
intent={intent}
disabled={disabled}
active={adjustedValue === false}
active={shownValue === false}
onClick={() => {
this.fieldChange(field, false);
if (onFinalize) onFinalize();
@ -400,7 +394,7 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
<Button
intent={intent}
disabled={disabled}
active={adjustedValue === true}
active={shownValue === true}
onClick={() => {
this.fieldChange(field, true);
if (onFinalize) onFinalize();

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
import { Button, Callout, Classes, Code, Dialog, Intent } from '@blueprintjs/core';
import { Button, Callout, Classes, Code, Dialog, Intent, Switch } from '@blueprintjs/core';
import React, { useState } from 'react';
import type { FormJsonTabs } from '../../components';
@ -26,7 +26,7 @@ import {
COMPACTION_CONFIG_FIELDS,
compactionConfigHasLegacyInputSegmentSizeBytesSet,
} from '../../druid-models';
import { deepDelete, formatBytesCompact } from '../../utils';
import { deepDelete, deepGet, deepSet, formatBytesCompact } from '../../utils';
import { CompactionHistoryDialog } from '../compaction-history-dialog/compaction-history-dialog';
import './compaction-config-dialog.scss';
@ -96,11 +96,24 @@ export const CompactionConfigDialog = React.memo(function CompactionConfigDialog
/>
<div className="content">
{currentTab === 'form' ? (
<AutoForm
fields={COMPACTION_CONFIG_FIELDS}
model={currentConfig}
onChange={m => setCurrentConfig(m as CompactionConfig)}
/>
<>
<AutoForm
fields={COMPACTION_CONFIG_FIELDS}
model={currentConfig}
onChange={m => setCurrentConfig(m as CompactionConfig)}
/>
<Switch
label="Allow concurrent compactions (experimental)"
checked={typeof deepGet(currentConfig, 'taskContext.taskLockType') === 'string'}
onChange={() => {
setCurrentConfig(
(typeof deepGet(currentConfig, 'taskContext.taskLockType') === 'string'
? deepDelete(currentConfig, 'taskContext.taskLockType')
: deepSet(currentConfig, 'taskContext.taskLockType', 'REPLACE')) as any,
);
}}
/>
</>
) : (
<JsonInput
value={currentConfig}

View File

@ -354,13 +354,4 @@ export const COMPACTION_CONFIG_FIELDS: Field<CompactionConfig>[] = [
</>
),
},
{
name: 'taskContext.taskLockType',
type: 'boolean',
label: 'Allow concurrent compactions (experimental)',
defaultValue: undefined,
valueAdjustment: v => (v ? 'REPLACE' : undefined),
adjustValue: v => v === 'REPLACE',
info: <p>Allows or forbids concurrent compactions.</p>,
},
];

View File

@ -347,7 +347,10 @@ export function normalizeSpec(spec: Partial<IngestionSpec>): IngestionSpec {
spec = deepSetIfUnset(spec, 'spec.tuningConfig.type', specType);
if (spec.context?.taskLockType !== undefined) {
spec.context.taskLockType = spec.spec?.ioConfig.appendToExisting ? 'APPEND' : 'REPLACE';
spec.context.taskLockType =
isStreamingSpec(spec) || deepGet(spec, 'spec.ioConfig.appendToExisting')
? 'APPEND'
: 'REPLACE';
}
return spec as IngestionSpec;

View File

@ -3134,8 +3134,6 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const { spec } = this.state;
const parallel = deepGet(spec, 'spec.tuningConfig.maxNumConcurrentSubTasks') > 1;
const appendToExisting = spec.spec?.ioConfig.appendToExisting;
return (
<>
<div className="main">
@ -3161,6 +3159,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
name: 'spec.ioConfig.appendToExisting',
label: 'Append to existing',
type: 'boolean',
defined: s => !isStreamingSpec(s),
defaultValue: false,
// appendToExisting can only be set on 'dynamic' portioning.
// We chose to show it always and instead have a specific message, separate from this form, to notify the user of the issue.
@ -3171,37 +3170,27 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</>
),
},
{
name: 'context.taskLockType',
type: 'boolean',
label: `Allow concurrent ${
appendToExisting ? 'append' : 'replace'
} tasks (experimental)`,
defaultValue: undefined,
valueAdjustment: v => {
if (!v) return undefined;
if (isStreamingSpec(spec)) {
return 'APPEND';
} else {
return appendToExisting ? 'APPEND' : 'REPLACE';
}
},
adjustValue: v => {
if (v === undefined) return false;
if (isStreamingSpec(spec)) {
return v === 'APPEND';
}
return v === (appendToExisting ? 'APPEND' : 'REPLACE');
},
info: <p>Allows or forbids concurrent tasks.</p>,
},
]}
model={spec}
onChange={this.updateSpec}
/>
<Switch
label="Allow concurrent tasks (experimental)"
checked={typeof deepGet(spec, 'context.taskLockType') === 'string'}
onChange={() => {
this.updateSpec(
typeof deepGet(spec, 'context.taskLockType') === 'string'
? deepDelete(spec, 'context.taskLockType')
: deepSet(
spec,
'context.taskLockType',
isStreamingSpec(spec) || deepGet(spec, 'spec.ioConfig.appendToExisting')
? 'APPEND'
: 'REPLACE',
),
);
}}
/>
</div>
<div className="other">
<H5>Parse error reporting</H5>