diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 824c28d2e3..ad5f200f5d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -524,7 +524,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return; } - throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified"); + throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified. Retrieve the most up-to-date revision and try again."); } @Override diff --git a/nifi-frontend/src/main/frontend/.gitignore b/nifi-frontend/src/main/frontend/.gitignore index 8224631ef2..581343e403 100644 --- a/nifi-frontend/src/main/frontend/.gitignore +++ b/nifi-frontend/src/main/frontend/.gitignore @@ -36,6 +36,7 @@ yarn-error.log /libpeerconnection.log testem.log /typings +/.tool-versions # System files .DS_Store diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts index a5882f5396..4850df9ab5 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts @@ -87,6 +87,7 @@ import { StartComponentRequest, StartComponentResponse, StartComponentsRequest, + StartPollingProcessorUntilStoppedRequest, StartProcessGroupRequest, StartProcessGroupResponse, StopComponentRequest, @@ -776,6 +777,20 @@ export const pollChangeVersionSuccess = createAction( export const stopPollingChangeVersion = createAction(`${CANVAS_PREFIX} Stop Polling Change Version`); +export const startPollingProcessorUntilStopped = createAction( + `${CANVAS_PREFIX} Start Polling Processor Until Stopped`, + props<{ request: StartPollingProcessorUntilStoppedRequest }>() +); + +export const pollProcessorUntilStopped = createAction(`${CANVAS_PREFIX} Poll Processor Until Stopped`); + +export const pollProcessorUntilStoppedSuccess = createAction( + `${CANVAS_PREFIX} Poll Processor Until Stopped Success`, + props<{ response: LoadProcessorSuccess }>() +); + +export const stopPollingProcessor = createAction(`${CANVAS_PREFIX} Stop Polling Processor`); + export const openSaveVersionDialog = createAction( `${CANVAS_PREFIX} Open Save Flow Version Dialog`, props<{ request: SaveVersionDialogRequest }>() diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts index 576f57114b..7819a895be 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts @@ -45,6 +45,8 @@ import { CreateConnectionDialogRequest, CreateProcessGroupDialogRequest, DeleteComponentResponse, + DisableComponentRequest, + EnableComponentRequest, GroupComponentsDialogRequest, ImportFromRegistryDialogRequest, LoadProcessGroupResponse, @@ -56,6 +58,8 @@ import { SaveVersionRequest, SelectedComponent, Snippet, + StartComponentRequest, + StopComponentRequest, StopVersionControlRequest, StopVersionControlResponse, UpdateComponentFailure, @@ -80,6 +84,7 @@ import { selectParentProcessGroupId, selectProcessGroup, selectProcessor, + selectPollingProcessor, selectRefreshRpgDetails, selectRemoteProcessGroup, selectSaving, @@ -160,6 +165,13 @@ import { selectDocumentVisibilityState } from '../../../../state/document-visibi import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { DocumentVisibility } from '../../../../state/document-visibility'; import { ErrorContextKey } from '../../../../state/error'; +import { + disableComponent, + enableComponent, + startComponent, + startPollingProcessorUntilStopped, + stopComponent +} from './flow.actions'; import { CopyPasteService } from '../../service/copy-paste.service'; import { selectCopiedContent } from '../../../../state/copy/copy.selectors'; import { CopyRequestContext, CopyResponseContext } from '../../../../state/copy'; @@ -1428,6 +1440,7 @@ export class FlowEffects { }), tap(([request, parameterContext, processGroupId]) => { const processorId: string = request.entity.id; + let runStatusChanged: boolean = false; const editDialogReference = this.dialog.open(EditProcessor, { ...XL_DIALOG, @@ -1555,6 +1568,116 @@ export class FlowEffects { }) ); }); + const startPollingIfNecessary = (processorEntity: any): boolean => { + if ( + (processorEntity.status.aggregateSnapshot.runStatus === 'Stopped' && + processorEntity.status.aggregateSnapshot.activeThreadCount > 0) || + processorEntity.status.aggregateSnapshot.runStatus === 'Validating' + ) { + this.store.dispatch( + startPollingProcessorUntilStopped({ + request: { + id: processorEntity.id + } + }) + ); + return true; + } + + return false; + }; + + const pollingStarted = startPollingIfNecessary(request.entity); + + this.store + .select(selectProcessor(processorId)) + .pipe( + takeUntil(editDialogReference.afterClosed()), + isDefinedAndNotNull(), + filter((processorEntity) => { + return ( + (runStatusChanged || pollingStarted) && + processorEntity.revision.clientId === this.client.getClientId() + ); + }), + concatLatestFrom(() => this.store.select(selectPollingProcessor)) + ) + .subscribe(([processorEntity, pollingProcessor]) => { + editDialogReference.componentInstance.processorUpdates = processorEntity; + + // if we're already polling we do not want to start polling again + if (!pollingProcessor) { + startPollingIfNecessary(processorEntity); + } + }); + + editDialogReference.componentInstance.stopComponentRequest + .pipe(takeUntil(editDialogReference.afterClosed())) + .subscribe((stopComponentRequest: StopComponentRequest) => { + runStatusChanged = true; + this.store.dispatch( + stopComponent({ + request: { + id: stopComponentRequest.id, + uri: stopComponentRequest.uri, + type: ComponentType.Processor, + revision: stopComponentRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + }); + + editDialogReference.componentInstance.disableComponentRequest + .pipe(takeUntil(editDialogReference.afterClosed())) + .subscribe((disableComponentsRequest: DisableComponentRequest) => { + runStatusChanged = true; + this.store.dispatch( + disableComponent({ + request: { + id: disableComponentsRequest.id, + uri: disableComponentsRequest.uri, + type: ComponentType.Processor, + revision: disableComponentsRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + }); + + editDialogReference.componentInstance.enableComponentRequest + .pipe(takeUntil(editDialogReference.afterClosed())) + .subscribe((enableComponentsRequest: EnableComponentRequest) => { + runStatusChanged = true; + this.store.dispatch( + enableComponent({ + request: { + id: enableComponentsRequest.id, + uri: enableComponentsRequest.uri, + type: ComponentType.Processor, + revision: enableComponentsRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + }); + + editDialogReference.componentInstance.startComponentRequest + .pipe(takeUntil(editDialogReference.afterClosed())) + .subscribe((startComponentRequest: StartComponentRequest) => { + runStatusChanged = true; + this.store.dispatch( + startComponent({ + request: { + id: startComponentRequest.id, + uri: startComponentRequest.uri, + type: ComponentType.Processor, + revision: startComponentRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + }); editDialogReference.afterClosed().subscribe((response) => { this.store.dispatch(resetPropertyVerificationState()); @@ -1578,6 +1701,57 @@ export class FlowEffects { { dispatch: false } ); + startPollingProcessorUntilStopped = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.startPollingProcessorUntilStopped), + switchMap(() => + interval(2000, asyncScheduler).pipe( + takeUntil(this.actions$.pipe(ofType(FlowActions.stopPollingProcessor))) + ) + ), + switchMap(() => of(FlowActions.pollProcessorUntilStopped())) + ) + ); + + pollProcessorUntilStopped$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.pollProcessorUntilStopped), + concatLatestFrom(() => [this.store.select(selectPollingProcessor).pipe(isDefinedAndNotNull())]), + switchMap(([, pollingProcessor]) => { + return from( + this.flowService.getProcessor(pollingProcessor.id).pipe( + map((response) => + FlowActions.pollProcessorUntilStoppedSuccess({ + response: { + id: pollingProcessor.id, + processor: response + } + }) + ), + catchError((errorResponse: HttpErrorResponse) => { + this.store.dispatch(FlowActions.stopPollingProcessor()); + return of(this.snackBarOrFullScreenError(errorResponse)); + }) + ) + ); + }) + ) + ); + + pollProcessorUntilStoppedSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.pollProcessorUntilStoppedSuccess), + map((action) => action.response), + filter((response) => { + return ( + response.processor.status.runStatus === 'Stopped' && + response.processor.status.aggregateSnapshot.activeThreadCount === 0 + ); + }), + switchMap(() => of(FlowActions.stopPollingProcessor())) + ) + ); + openEditConnectionDialog$ = createEffect( () => this.actions$.pipe( diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts index cd3ec9f31b..4cff6a7b2d 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts @@ -50,6 +50,7 @@ import { navigateWithoutTransform, pasteSuccess, pollChangeVersionSuccess, + pollProcessorUntilStoppedSuccess, pollRevertChangesSuccess, requestRefreshRemoteProcessGroup, resetFlowState, @@ -68,10 +69,12 @@ import { setTransitionRequired, startComponent, startComponentSuccess, + startPollingProcessorUntilStopped, startProcessGroupSuccess, startRemoteProcessGroupPolling, stopComponent, stopComponentSuccess, + stopPollingProcessor, stopProcessGroupSuccess, stopRemoteProcessGroupPolling, stopVersionControl, @@ -92,6 +95,7 @@ import { produce } from 'immer'; export const initialState: FlowState = { id: 'root', changeVersionRequest: null, + pollingProcessor: null, flow: { revision: { version: 0 @@ -297,7 +301,7 @@ export const flowReducer = createReducer( } }); }), - on(loadProcessorSuccess, (state, { response }) => { + on(loadProcessorSuccess, pollProcessorUntilStoppedSuccess, (state, { response }) => { return produce(state, (draftState) => { const proposedProcessor = response.processor; const componentIndex: number = draftState.flow.processGroupFlow.flow.processors.findIndex( @@ -373,6 +377,14 @@ export const flowReducer = createReducer( saving: false, versionSaving: false })), + on(startPollingProcessorUntilStopped, (state, { request }) => ({ + ...state, + pollingProcessor: request + })), + on(stopPollingProcessor, (state) => ({ + ...state, + pollingProcessor: null + })), on( createProcessor, createProcessGroup, diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts index 101dcb8c77..31e04c68e7 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts @@ -30,6 +30,8 @@ export const selectChangeVersionRequest = createSelector( (state: FlowState) => state.changeVersionRequest ); +export const selectPollingProcessor = createSelector(selectFlowState, (state: FlowState) => state.pollingProcessor); + export const selectSaving = createSelector(selectFlowState, (state: FlowState) => state.saving); export const selectVersionSaving = createSelector(selectFlowState, (state: FlowState) => state.versionSaving); diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts index eea1eea233..5160c70c8f 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts @@ -659,6 +659,7 @@ export interface FlowState { flowAnalysisOpen: boolean; versionSaving: boolean; changeVersionRequest: FlowUpdateRequestEntity | null; + pollingProcessor: StartPollingProcessorUntilStoppedRequest | null; status: 'pending' | 'loading' | 'success' | 'complete'; } @@ -792,6 +793,10 @@ export interface StopComponentRequest { errorStrategy: 'snackbar' | 'banner'; } +export interface StartPollingProcessorUntilStoppedRequest { + id: string; +} + export interface StopProcessGroupRequest { id: string; type: ComponentType; diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/header/flow-status/_flow-status.component-theme.scss b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/header/flow-status/_flow-status.component-theme.scss index d753fa35ad..08e581d217 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/header/flow-status/_flow-status.component-theme.scss +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/header/flow-status/_flow-status.component-theme.scss @@ -40,11 +40,6 @@ neutral, map.get(map.get($config, neutral), lighter) ); - $material-theme-neutral-palette-default: mat.get-theme-color( - $material-theme, - neutral, - map.get(map.get($config, neutral), default) - ); $material-theme-primary-palette-default: mat.get-theme-color( $material-theme, diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/_edit-processor.component-theme.scss b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/_edit-processor.component-theme.scss new file mode 100644 index 0000000000..f7cfc4563f --- /dev/null +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/_edit-processor.component-theme.scss @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@use 'sass:map'; +@use '@angular/material' as mat; + +@mixin generate-theme($material-theme, $config) { + $is-material-dark: if(mat.get-theme-type($material-theme) == dark, true, false); + $material-theme-secondary-palette-default: mat.get-theme-color( + $material-theme, + secondary, + map.get(map.get($config, secondary), default) + ); + $material-theme-error-palette-default: mat.get-theme-color( + $material-theme, + error, + map.get(map.get($config, error), default) + ); + + $material-theme-primary-palette-default: mat.get-theme-color( + $material-theme, + primary, + map.get(map.get($config, primary), default) + ); + + $primary-contrast: map.get(map.get($config, primary), contrast); + $caution-contrast: map.get(map.get($config, caution), contrast); + $error-contrast: map.get(map.get($config, error), contrast); + $success: map.get(map.get($config, success), default); + $caution: map.get(map.get($config, caution), default); + + #edit-processor-header { + .bulletins { + background-color: unset; + + .fa { + color: $material-theme-primary-palette-default; + } + } + + .bulletins.has-bulletins { + .fa { + color: $primary-contrast; + } + + &.error { + .fa { + color: $error-contrast; + } + + background-color: $material-theme-error-palette-default; + } + &.warning { + .fa { + color: $caution-contrast; + } + + background-color: $caution; + } + &.info, + &.debug, + &.trace { + background-color: $success; + } + } + } +} diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html index 93a9b035fe..c9ccb9e622 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html @@ -15,19 +15,37 @@ ~ limitations under the License. --> -