[NIFI-12761] refresh RPG (#8401)

* [NIFI-12761] refresh RPG

* address feedback

* stop polling on error

* fix linting errors introduced in this PR

* reload the connections

* refresh rpg, if first call does not have updated timestamp start polling

* rename action

* final touches

This closes #8401
This commit is contained in:
Scott Aslan 2024-02-16 11:15:44 -05:00 committed by GitHub
parent 6f6ddf8960
commit 3c5b997ce7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 130 additions and 10 deletions

View File

@ -37,6 +37,7 @@ import {
navigateToViewStatusHistoryForComponent,
reloadFlow,
replayLastProvenanceEvent,
requestRefreshRemoteProcessGroup,
runOnce,
startComponents,
startCurrentProcessGroup,
@ -750,12 +751,19 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider {
},
{
condition: (selection: any) => {
return this.canvasUtils.isRemoteProcessGroup(selection);
return this.canvasUtils.canRead(selection) && this.canvasUtils.isRemoteProcessGroup(selection);
},
clazz: 'fa fa-refresh',
text: 'Refresh remote',
action: () => {
// TODO - refreshRemoteFlow
action: (selection: any) => {
const d = selection.datum();
const id = d.id;
const refreshTimestamp = d.component.flowRefreshed;
const request = {
id,
refreshTimestamp
};
this.store.dispatch(requestRefreshRemoteProcessGroup({ request }));
}
},
{
@ -964,17 +972,15 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider {
},
{
condition: (selection: any) => {
return this.canvasUtils.isRemoteProcessGroup(selection);
return this.canvasUtils.canRead(selection) && this.canvasUtils.isRemoteProcessGroup(selection);
},
clazz: 'fa fa-external-link',
text: 'Go to',
action: (selection: any) => {
if (selection.size() === 1 && this.canvasUtils.isRemoteProcessGroup(selection)) {
const selectionData = selection.datum();
const uri = selectionData.component.targetUri;
const selectionData = selection.datum();
const uri = selectionData.component.targetUri;
this.store.dispatch(goToRemoteProcessGroup({ request: { uri } }));
}
this.store.dispatch(goToRemoteProcessGroup({ request: { uri } }));
}
},
{

View File

@ -75,7 +75,8 @@ import {
CenterComponentRequest,
ImportFromRegistryDialogRequest,
ImportFromRegistryRequest,
GoToRemoteProcessGroupRequest
GoToRemoteProcessGroupRequest,
RefreshRemoteProcessGroupRequest
} from './index';
import { StatusHistoryRequest } from '../../../../state/status-history';
@ -232,6 +233,17 @@ export const goToRemoteProcessGroup = createAction(
props<{ request: GoToRemoteProcessGroupRequest }>()
);
export const refreshRemoteProcessGroup = createAction(`${CANVAS_PREFIX} Refresh Remote Process Group`);
export const requestRefreshRemoteProcessGroup = createAction(
`${CANVAS_PREFIX} Request Refresh Remote Process Group Polling`,
props<{ request: RefreshRemoteProcessGroupRequest }>()
);
export const startRemoteProcessGroupPolling = createAction(`${CANVAS_PREFIX} Start Remote Process Group Polling`);
export const stopRemoteProcessGroupPolling = createAction(`${CANVAS_PREFIX} Stop Remote Process Group Polling`);
export const createProcessGroup = createAction(
`${CANVAS_PREFIX} Create Process Group`,
props<{ request: CreateProcessGroupRequest }>()

View File

@ -58,6 +58,7 @@ import {
selectProcessGroup,
selectProcessor,
selectRemoteProcessGroup,
selectRefreshRpgDetails,
selectSaving
} from './flow.selectors';
import { ConnectionManager } from '../../service/manager/connection-manager.service';
@ -67,6 +68,7 @@ import { EditPort } from '../../ui/canvas/items/port/edit-port/edit-port.compone
import {
BucketEntity,
ComponentType,
isDefinedAndNotNull,
RegistryClientEntity,
VersionedFlowEntity,
VersionedFlowSnapshotMetadataEntity
@ -96,6 +98,7 @@ import { ImportFromRegistry } from '../../ui/canvas/items/flow/import-from-regis
import { selectCurrentUser } from '../../../../state/current-user/current-user.selectors';
import { NoRegistryClientsDialog } from '../../ui/common/no-registry-clients-dialog/no-registry-clients-dialog.component';
import { EditRemoteProcessGroup } from '../../ui/canvas/items/remote-process-group/edit-remote-process-group/edit-remote-process-group.component';
import { ErrorHelper } from '../../../../service/error-helper.service';
@Injectable()
export class FlowEffects {
@ -359,6 +362,70 @@ export class FlowEffects {
{ dispatch: false }
);
startRemoteProcessGroupPolling$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.startRemoteProcessGroupPolling),
switchMap(() => {
return interval(3000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(FlowActions.stopRemoteProcessGroupPolling)))
);
}),
switchMap(() => {
return of(FlowActions.refreshRemoteProcessGroup());
})
)
);
requestRefreshRemoteProcessGroup$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.requestRefreshRemoteProcessGroup),
switchMap(() => {
return of(FlowActions.refreshRemoteProcessGroup());
})
)
);
refreshRemoteProcessGroup$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.refreshRemoteProcessGroup),
concatLatestFrom(() => this.store.select(selectRefreshRpgDetails).pipe(isDefinedAndNotNull())),
switchMap(([, refreshRpgDetails]) =>
from(
this.flowService.getRemoteProcessGroup(refreshRpgDetails.request.id).pipe(
map((response: any) => {
const entity = response;
if (refreshRpgDetails.request.refreshTimestamp !== entity.component.flowRefreshed) {
this.store.dispatch(FlowActions.stopRemoteProcessGroupPolling());
// reload the group's connections
this.store.dispatch(FlowActions.loadConnectionsForComponent({ id: entity.id }));
} else {
if (!refreshRpgDetails.polling) {
this.store.dispatch(FlowActions.startRemoteProcessGroupPolling());
}
entity.component.flowRefreshed = 'Refreshing...';
}
return FlowActions.loadRemoteProcessGroupSuccess({
response: {
id: entity.id,
remoteProcessGroup: entity
}
});
}),
catchError((error) => {
this.store.dispatch(FlowActions.stopRemoteProcessGroupPolling());
return of(FlowActions.flowApiError({ error: error.error }));
})
)
)
)
)
);
openNewProcessGroupDialog$ = createEffect(
() =>
this.actions$.pipe(

View File

@ -38,6 +38,7 @@ import {
loadProcessorSuccess,
loadRemoteProcessGroupSuccess,
navigateWithoutTransform,
requestRefreshRemoteProcessGroup,
resetFlowState,
runOnce,
runOnceSuccess,
@ -49,7 +50,9 @@ import {
setTransitionRequired,
startComponent,
startComponentSuccess,
startRemoteProcessGroupPolling,
stopComponentSuccess,
stopRemoteProcessGroupPolling,
updateComponent,
updateComponentFailure,
updateComponentSuccess,
@ -127,6 +130,7 @@ export const initialState: FlowState = {
connectedNodeCount: 0,
totalNodeCount: 0
},
refreshRpgDetails: null,
controllerBulletins: {
bulletins: [],
controllerServiceBulletins: [],
@ -150,6 +154,24 @@ export const flowReducer = createReducer(
on(resetFlowState, () => ({
...initialState
})),
on(requestRefreshRemoteProcessGroup, (state, { request }) => ({
...state,
refreshRpgDetails: {
request,
polling: false
}
})),
on(startRemoteProcessGroupPolling, (state) => {
return produce(state, (draftState) => {
if (draftState.refreshRpgDetails) {
draftState.refreshRpgDetails.polling = true;
}
});
}),
on(stopRemoteProcessGroupPolling, (state) => ({
...state,
refreshRpgDetails: null
})),
on(loadProcessGroup, (state, { request }) => ({
...state,
transitionRequired: request.transitionRequired,

View File

@ -32,6 +32,8 @@ export const selectSaving = createSelector(selectFlowState, (state: FlowState) =
export const selectCurrentProcessGroupId = createSelector(selectFlowState, (state: FlowState) => state.id);
export const selectRefreshRpgDetails = createSelector(selectFlowState, (state: FlowState) => state.refreshRpgDetails);
export const selectCurrentParameterContext = createSelector(
selectFlowState,
(state: FlowState) => state.flow.processGroupFlow.parameterContext

View File

@ -208,6 +208,16 @@ export interface GoToRemoteProcessGroupRequest {
uri: string;
}
export interface RefreshRemoteProcessGroupRequest {
id: string;
refreshTimestamp: string;
}
export interface RefreshRemoteProcessGroupPollingDetailsRequest {
request: RefreshRemoteProcessGroupRequest;
polling: boolean;
}
export interface CreateProcessorRequest extends CreateComponentRequest {
processorType: string;
processorBundle: Bundle;
@ -503,6 +513,7 @@ export interface FlowState {
id: string;
flow: ProcessGroupFlowEntity;
flowStatus: ControllerStatusEntity;
refreshRpgDetails: RefreshRemoteProcessGroupPollingDetailsRequest | null;
clusterSummary: ClusterSummary;
controllerBulletins: ControllerBulletinsEntity;
dragging: boolean;