From 3c5b997ce759c30673c246fcd3331bcb099a9df1 Mon Sep 17 00:00:00 2001 From: Scott Aslan Date: Fri, 16 Feb 2024 11:15:44 -0500 Subject: [PATCH] [NIFI-12761] refresh RPG (#8401) * [NIFI-12761] refresh RPG * address feedback * stop polling on error * fix linting errors introduced in this PR * reload the connections * refresh rpg, if first call does not have updated timestamp start polling * rename action * final touches This closes #8401 --- .../service/canvas-context-menu.service.ts | 24 ++++--- .../flow-designer/state/flow/flow.actions.ts | 14 +++- .../flow-designer/state/flow/flow.effects.ts | 67 +++++++++++++++++++ .../flow-designer/state/flow/flow.reducer.ts | 22 ++++++ .../state/flow/flow.selectors.ts | 2 + .../pages/flow-designer/state/flow/index.ts | 11 +++ 6 files changed, 130 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts index 45fb9b262e..01e3f8e440 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts @@ -37,6 +37,7 @@ import { navigateToViewStatusHistoryForComponent, reloadFlow, replayLastProvenanceEvent, + requestRefreshRemoteProcessGroup, runOnce, startComponents, startCurrentProcessGroup, @@ -750,12 +751,19 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider { }, { condition: (selection: any) => { - return this.canvasUtils.isRemoteProcessGroup(selection); + return this.canvasUtils.canRead(selection) && this.canvasUtils.isRemoteProcessGroup(selection); }, clazz: 'fa fa-refresh', text: 'Refresh remote', - action: () => { - // TODO - refreshRemoteFlow + action: (selection: any) => { + const d = selection.datum(); + const id = d.id; + const refreshTimestamp = d.component.flowRefreshed; + const request = { + id, + refreshTimestamp + }; + this.store.dispatch(requestRefreshRemoteProcessGroup({ request })); } }, { @@ -964,17 +972,15 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider { }, { condition: (selection: any) => { - return this.canvasUtils.isRemoteProcessGroup(selection); + return this.canvasUtils.canRead(selection) && this.canvasUtils.isRemoteProcessGroup(selection); }, clazz: 'fa fa-external-link', text: 'Go to', action: (selection: any) => { - if (selection.size() === 1 && this.canvasUtils.isRemoteProcessGroup(selection)) { - const selectionData = selection.datum(); - const uri = selectionData.component.targetUri; + const selectionData = selection.datum(); + const uri = selectionData.component.targetUri; - this.store.dispatch(goToRemoteProcessGroup({ request: { uri } })); - } + this.store.dispatch(goToRemoteProcessGroup({ request: { uri } })); } }, { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts index 89eeccfd49..7dfb10a634 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts @@ -75,7 +75,8 @@ import { CenterComponentRequest, ImportFromRegistryDialogRequest, ImportFromRegistryRequest, - GoToRemoteProcessGroupRequest + GoToRemoteProcessGroupRequest, + RefreshRemoteProcessGroupRequest } from './index'; import { StatusHistoryRequest } from '../../../../state/status-history'; @@ -232,6 +233,17 @@ export const goToRemoteProcessGroup = createAction( props<{ request: GoToRemoteProcessGroupRequest }>() ); +export const refreshRemoteProcessGroup = createAction(`${CANVAS_PREFIX} Refresh Remote Process Group`); + +export const requestRefreshRemoteProcessGroup = createAction( + `${CANVAS_PREFIX} Request Refresh Remote Process Group Polling`, + props<{ request: RefreshRemoteProcessGroupRequest }>() +); + +export const startRemoteProcessGroupPolling = createAction(`${CANVAS_PREFIX} Start Remote Process Group Polling`); + +export const stopRemoteProcessGroupPolling = createAction(`${CANVAS_PREFIX} Stop Remote Process Group Polling`); + export const createProcessGroup = createAction( `${CANVAS_PREFIX} Create Process Group`, props<{ request: CreateProcessGroupRequest }>() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts index eff3dba2cd..02a341f5d8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts @@ -58,6 +58,7 @@ import { selectProcessGroup, selectProcessor, selectRemoteProcessGroup, + selectRefreshRpgDetails, selectSaving } from './flow.selectors'; import { ConnectionManager } from '../../service/manager/connection-manager.service'; @@ -67,6 +68,7 @@ import { EditPort } from '../../ui/canvas/items/port/edit-port/edit-port.compone import { BucketEntity, ComponentType, + isDefinedAndNotNull, RegistryClientEntity, VersionedFlowEntity, VersionedFlowSnapshotMetadataEntity @@ -96,6 +98,7 @@ import { ImportFromRegistry } from '../../ui/canvas/items/flow/import-from-regis import { selectCurrentUser } from '../../../../state/current-user/current-user.selectors'; import { NoRegistryClientsDialog } from '../../ui/common/no-registry-clients-dialog/no-registry-clients-dialog.component'; import { EditRemoteProcessGroup } from '../../ui/canvas/items/remote-process-group/edit-remote-process-group/edit-remote-process-group.component'; +import { ErrorHelper } from '../../../../service/error-helper.service'; @Injectable() export class FlowEffects { @@ -359,6 +362,70 @@ export class FlowEffects { { dispatch: false } ); + startRemoteProcessGroupPolling$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.startRemoteProcessGroupPolling), + switchMap(() => { + return interval(3000, asyncScheduler).pipe( + takeUntil(this.actions$.pipe(ofType(FlowActions.stopRemoteProcessGroupPolling))) + ); + }), + switchMap(() => { + return of(FlowActions.refreshRemoteProcessGroup()); + }) + ) + ); + + requestRefreshRemoteProcessGroup$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.requestRefreshRemoteProcessGroup), + switchMap(() => { + return of(FlowActions.refreshRemoteProcessGroup()); + }) + ) + ); + + refreshRemoteProcessGroup$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.refreshRemoteProcessGroup), + concatLatestFrom(() => this.store.select(selectRefreshRpgDetails).pipe(isDefinedAndNotNull())), + switchMap(([, refreshRpgDetails]) => + from( + this.flowService.getRemoteProcessGroup(refreshRpgDetails.request.id).pipe( + map((response: any) => { + const entity = response; + + if (refreshRpgDetails.request.refreshTimestamp !== entity.component.flowRefreshed) { + this.store.dispatch(FlowActions.stopRemoteProcessGroupPolling()); + + // reload the group's connections + this.store.dispatch(FlowActions.loadConnectionsForComponent({ id: entity.id })); + } else { + if (!refreshRpgDetails.polling) { + this.store.dispatch(FlowActions.startRemoteProcessGroupPolling()); + } + + entity.component.flowRefreshed = 'Refreshing...'; + } + + return FlowActions.loadRemoteProcessGroupSuccess({ + response: { + id: entity.id, + remoteProcessGroup: entity + } + }); + }), + catchError((error) => { + this.store.dispatch(FlowActions.stopRemoteProcessGroupPolling()); + + return of(FlowActions.flowApiError({ error: error.error })); + }) + ) + ) + ) + ) + ); + openNewProcessGroupDialog$ = createEffect( () => this.actions$.pipe( diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts index 6daf2432d9..eb47b9ff68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts @@ -38,6 +38,7 @@ import { loadProcessorSuccess, loadRemoteProcessGroupSuccess, navigateWithoutTransform, + requestRefreshRemoteProcessGroup, resetFlowState, runOnce, runOnceSuccess, @@ -49,7 +50,9 @@ import { setTransitionRequired, startComponent, startComponentSuccess, + startRemoteProcessGroupPolling, stopComponentSuccess, + stopRemoteProcessGroupPolling, updateComponent, updateComponentFailure, updateComponentSuccess, @@ -127,6 +130,7 @@ export const initialState: FlowState = { connectedNodeCount: 0, totalNodeCount: 0 }, + refreshRpgDetails: null, controllerBulletins: { bulletins: [], controllerServiceBulletins: [], @@ -150,6 +154,24 @@ export const flowReducer = createReducer( on(resetFlowState, () => ({ ...initialState })), + on(requestRefreshRemoteProcessGroup, (state, { request }) => ({ + ...state, + refreshRpgDetails: { + request, + polling: false + } + })), + on(startRemoteProcessGroupPolling, (state) => { + return produce(state, (draftState) => { + if (draftState.refreshRpgDetails) { + draftState.refreshRpgDetails.polling = true; + } + }); + }), + on(stopRemoteProcessGroupPolling, (state) => ({ + ...state, + refreshRpgDetails: null + })), on(loadProcessGroup, (state, { request }) => ({ ...state, transitionRequired: request.transitionRequired, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts index e940b480ff..acc0e1a374 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts @@ -32,6 +32,8 @@ export const selectSaving = createSelector(selectFlowState, (state: FlowState) = export const selectCurrentProcessGroupId = createSelector(selectFlowState, (state: FlowState) => state.id); +export const selectRefreshRpgDetails = createSelector(selectFlowState, (state: FlowState) => state.refreshRpgDetails); + export const selectCurrentParameterContext = createSelector( selectFlowState, (state: FlowState) => state.flow.processGroupFlow.parameterContext diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/index.ts b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/index.ts index 69ee7d3c32..38b59006a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/index.ts +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/index.ts @@ -208,6 +208,16 @@ export interface GoToRemoteProcessGroupRequest { uri: string; } +export interface RefreshRemoteProcessGroupRequest { + id: string; + refreshTimestamp: string; +} + +export interface RefreshRemoteProcessGroupPollingDetailsRequest { + request: RefreshRemoteProcessGroupRequest; + polling: boolean; +} + export interface CreateProcessorRequest extends CreateComponentRequest { processorType: string; processorBundle: Bundle; @@ -503,6 +513,7 @@ export interface FlowState { id: string; flow: ProcessGroupFlowEntity; flowStatus: ControllerStatusEntity; + refreshRpgDetails: RefreshRemoteProcessGroupPollingDetailsRequest | null; clusterSummary: ClusterSummary; controllerBulletins: ControllerBulletinsEntity; dragging: boolean;