NIFI-13155: (#8771)

- Handling newer revisions in flow reducer to ensure that the appropriate version of the component is saved in case responses are received out of order.

This closes #8771
This commit is contained in:
Matt Gilman 2024-05-08 09:57:58 -04:00 committed by GitHub
parent 0ab5e2f741
commit eda98121ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 238 additions and 43 deletions

View File

@ -131,6 +131,8 @@ import { SnippetService } from '../../service/snippet.service';
import { selectTransform } from '../transform/transform.selectors';
import { EditLabel } from '../../ui/canvas/items/label/edit-label/edit-label.component';
import { ErrorHelper } from '../../../../service/error-helper.service';
import { selectConnectedStateChanged } from '../../../../state/cluster-summary/cluster-summary.selectors';
import { resetConnectedStateChanged } from '../../../../state/cluster-summary/cluster-summary.actions';
@Injectable()
export class FlowEffects {
@ -176,8 +178,12 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.loadProcessGroup),
map((action) => action.request),
concatLatestFrom(() => this.store.select(selectFlowLoadingStatus)),
switchMap(([request, status]) =>
concatLatestFrom(() => [
this.store.select(selectFlowLoadingStatus),
this.store.select(selectConnectedStateChanged)
]),
tap(() => this.store.dispatch(resetConnectedStateChanged())),
switchMap(([request, status, connectedStateChanged]) =>
combineLatest([
this.flowService.getFlow(request.id),
this.flowService.getFlowStatus(),
@ -189,7 +195,8 @@ export class FlowEffects {
id: request.id,
flow: flow,
flowStatus: flowStatus,
controllerBulletins: controllerBulletins
controllerBulletins: controllerBulletins,
connectedStateChanged
}
});
}),

View File

@ -83,7 +83,7 @@ import {
updateProcessorSuccess,
uploadProcessGroup
} from './flow.actions';
import { FlowState } from './index';
import { ComponentEntity, FlowState } from './index';
import { ComponentType } from '../../../../state/shared';
import { produce } from 'immer';
@ -125,6 +125,8 @@ export const initialState: FlowState = {
lastRefreshed: ''
}
},
addedCache: [],
removedCache: [],
flowStatus: {
controllerStatus: {
activeThreadCount: 0,
@ -190,56 +192,170 @@ export const flowReducer = createReducer(
})),
on(loadProcessGroup, (state, { request }) => ({
...state,
addedCache: [],
removedCache: [],
transitionRequired: request.transitionRequired,
status: 'loading' as const
})),
on(loadProcessGroupSuccess, (state, { response }) => ({
...state,
id: response.flow.processGroupFlow.id,
flow: response.flow,
flowStatus: response.flowStatus,
controllerBulletins: response.controllerBulletins,
error: null,
status: 'success' as const
})),
on(loadProcessGroupSuccess, (state, { response }) => {
return produce(state, (draftState) => {
draftState.id = response.flow.processGroupFlow.id;
draftState.flow = {
...response.flow,
processGroupFlow: {
...response.flow.processGroupFlow,
flow: {
processors: processComponentCollection(
response.flow.processGroupFlow.flow.processors,
state.flow.processGroupFlow.flow.processors,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
inputPorts: processComponentCollection(
response.flow.processGroupFlow.flow.inputPorts,
state.flow.processGroupFlow.flow.inputPorts,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
outputPorts: processComponentCollection(
response.flow.processGroupFlow.flow.outputPorts,
state.flow.processGroupFlow.flow.outputPorts,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
processGroups: processComponentCollection(
response.flow.processGroupFlow.flow.processGroups,
state.flow.processGroupFlow.flow.processGroups,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
remoteProcessGroups: processComponentCollection(
response.flow.processGroupFlow.flow.remoteProcessGroups,
state.flow.processGroupFlow.flow.remoteProcessGroups,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
funnels: processComponentCollection(
response.flow.processGroupFlow.flow.funnels,
state.flow.processGroupFlow.flow.funnels,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
labels: processComponentCollection(
response.flow.processGroupFlow.flow.labels,
state.flow.processGroupFlow.flow.labels,
state.addedCache,
state.removedCache,
response.connectedStateChanged
),
connections: processComponentCollection(
response.flow.processGroupFlow.flow.connections,
state.flow.processGroupFlow.flow.connections,
state.addedCache,
state.removedCache,
response.connectedStateChanged
)
}
}
};
draftState.flowStatus = response.flowStatus;
draftState.controllerBulletins = response.controllerBulletins;
draftState.addedCache = [];
draftState.removedCache = [];
draftState.status = 'success' as const;
});
}),
on(loadConnectionSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedConnection = response.connection;
const componentIndex: number = draftState.flow.processGroupFlow.flow.connections.findIndex(
(f: any) => response.id === f.id
(f: any) => proposedConnection.id === f.id
);
if (componentIndex > -1) {
draftState.flow.processGroupFlow.flow.connections[componentIndex] = response.connection;
const currentConnection = draftState.flow.processGroupFlow.flow.connections[componentIndex];
const isNewerOrEqualRevision =
proposedConnection.revision.version >= currentConnection.revision.version;
if (isNewerOrEqualRevision) {
draftState.flow.processGroupFlow.flow.connections[componentIndex] = proposedConnection;
}
}
});
}),
on(loadProcessorSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedProcessor = response.processor;
const componentIndex: number = draftState.flow.processGroupFlow.flow.processors.findIndex(
(f: any) => response.id === f.id
(f: any) => proposedProcessor.id === f.id
);
if (componentIndex > -1) {
draftState.flow.processGroupFlow.flow.processors[componentIndex] = response.processor;
const currentProcessor = draftState.flow.processGroupFlow.flow.processors[componentIndex];
const isNewerOrEqualRevision = proposedProcessor.revision.version >= currentProcessor.revision.version;
if (isNewerOrEqualRevision) {
draftState.flow.processGroupFlow.flow.processors[componentIndex] = proposedProcessor;
}
}
});
}),
on(loadInputPortSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedInputPort = response.inputPort;
const componentIndex: number = draftState.flow.processGroupFlow.flow.inputPorts.findIndex(
(f: any) => response.id === f.id
(f: any) => proposedInputPort.id === f.id
);
if (componentIndex > -1) {
draftState.flow.processGroupFlow.flow.inputPorts[componentIndex] = response.inputPort;
const currentInputPort = draftState.flow.processGroupFlow.flow.inputPorts[componentIndex];
const isNewerOrEqualRevision = proposedInputPort.revision.version >= currentInputPort.revision.version;
if (isNewerOrEqualRevision) {
draftState.flow.processGroupFlow.flow.inputPorts[componentIndex] = proposedInputPort;
}
}
});
}),
on(loadRemoteProcessGroupSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedRemoteProcessGroup = response.remoteProcessGroup;
const componentIndex: number = draftState.flow.processGroupFlow.flow.remoteProcessGroups.findIndex(
(f: any) => response.id === f.id
(f: any) => proposedRemoteProcessGroup.id === f.id
);
if (componentIndex > -1) {
draftState.flow.processGroupFlow.flow.remoteProcessGroups[componentIndex] = response.remoteProcessGroup;
const currentRemoteProcessGroup =
draftState.flow.processGroupFlow.flow.remoteProcessGroups[componentIndex];
const isNewerOrEqualRevision =
proposedRemoteProcessGroup.revision.version >= currentRemoteProcessGroup.revision.version;
if (isNewerOrEqualRevision) {
draftState.flow.processGroupFlow.flow.remoteProcessGroups[componentIndex] =
proposedRemoteProcessGroup;
}
}
});
}),
on(loadChildProcessGroupSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedChildProcessGroup = response;
const componentIndex: number = draftState.flow.processGroupFlow.flow.processGroups.findIndex(
(f: any) => proposedChildProcessGroup.id === f.id
);
if (componentIndex > -1) {
const currentChildProcessGroup = draftState.flow.processGroupFlow.flow.processGroups[componentIndex];
const isNewerOrEqualRevision =
proposedChildProcessGroup.revision.version >= currentChildProcessGroup.revision.version;
if (isNewerOrEqualRevision) {
draftState.flow.processGroupFlow.flow.processGroups[componentIndex] = proposedChildProcessGroup;
}
}
draftState.saving = false;
});
}),
on(flowBannerError, flowSnackbarError, (state) => ({
@ -271,11 +387,13 @@ export const flowReducer = createReducer(
}
});
}),
on(createComponentComplete, (state) => ({
...state,
dragging: false,
saving: false
})),
on(createComponentComplete, (state, { response }) => {
return produce(state, (draftState) => {
draftState.addedCache.push(response.payload.id);
draftState.dragging = false;
draftState.saving = false;
});
}),
on(
updateComponent,
updateProcessor,
@ -337,6 +455,8 @@ export const flowReducer = createReducer(
on(deleteComponentsSuccess, (state, { response }) => {
return produce(state, (draftState) => {
response.forEach((deleteResponse) => {
draftState.removedCache.push(deleteResponse.id);
const collection: any[] | null = getComponentCollection(draftState, deleteResponse.type);
if (collection) {
@ -455,21 +575,6 @@ export const flowReducer = createReducer(
draftState.saving = false;
});
}),
on(loadChildProcessGroupSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const collection: any[] | null = getComponentCollection(draftState, ComponentType.ProcessGroup);
if (collection) {
const componentIndex: number = collection.findIndex((f: any) => response.id === f.id);
if (componentIndex > -1) {
collection[componentIndex] = response;
}
}
draftState.saving = false;
});
}),
on(saveToFlowRegistry, stopVersionControl, (state) => ({
...state,
versionSaving: true
@ -552,3 +657,63 @@ function getComponentCollection(draftState: FlowState, componentType: ComponentT
}
return collection;
}
function processComponentCollection(
proposedComponents: ComponentEntity[],
currentComponents: ComponentEntity[],
addedCache: string[],
removedCache: string[],
overrideRevisionCheck: boolean
): ComponentEntity[] {
// components in the proposed collection but not the current collection
const addedComponents: ComponentEntity[] = proposedComponents.filter((proposedComponent) => {
return !currentComponents.some((currentComponent) => currentComponent.id === proposedComponent.id);
});
// components in the current collection that are no longer in the proposed collection
const removedComponents: ComponentEntity[] = currentComponents.filter((currentComponent) => {
return !proposedComponents.some((proposedComponent) => proposedComponent.id === currentComponent.id);
});
// components that are in both the proposed collection and the current collection
const updatedComponents: ComponentEntity[] = currentComponents.filter((currentComponent) => {
return proposedComponents.some((proposedComponents) => proposedComponents.id === currentComponent.id);
});
const components = updatedComponents.map((currentComponent) => {
const proposedComponent = proposedComponents.find(
(proposedComponent) => proposedComponent.id === currentComponent.id
);
if (proposedComponent) {
// consider newer when the version is greater or equal. when the revision is equal we want to use the proposed component
// because it will contain updated stats/metrics. when the revision is greater it indicates the configuration was updated
const isNewerOrEqualRevision = proposedComponent.revision.version >= currentComponent.revision.version;
// use the proposed component when the revision is newer or equal or if we are overriding the revision check which
// happens when a node cluster connection state changes. when this happens we just accept the proposed component
// because it's revision basis is reset.
if (isNewerOrEqualRevision || overrideRevisionCheck) {
return proposedComponent;
}
}
return currentComponent;
});
addedComponents.forEach((addedComponent) => {
// if an added component is in the removed cache it means that the component was removed during the
// request to load the process group. if it's not in the remove cache we add it to the components
if (!removedCache.includes(addedComponent.id)) {
components.push(addedComponent);
}
});
removedComponents.forEach((removedComponent) => {
// if a removed component is in the added cache it means that the component was added during the
// request to load the process group. if it's in the added cache we add it to the components
if (addedCache.includes(removedComponent.id)) {
components.push(removedComponent);
}
});
return components;
}

View File

@ -67,6 +67,7 @@ export interface LoadProcessGroupResponse {
flow: ProcessGroupFlowEntity;
flowStatus: ControllerStatusEntity;
controllerBulletins: ControllerBulletinsEntity;
connectedStateChanged: boolean;
}
export interface LoadConnectionSuccess {
@ -538,6 +539,7 @@ export interface ComponentEntity {
id: string;
permissions: Permissions;
position: Position;
revision: Revision;
component: any;
}
@ -617,6 +619,8 @@ export interface ControllerBulletinsEntity {
export interface FlowState {
id: string;
flow: ProcessGroupFlowEntity;
addedCache: string[];
removedCache: string[];
flowStatus: ControllerStatusEntity;
refreshRpgDetails: RefreshRemoteProcessGroupPollingDetailsRequest | null;
controllerBulletins: ControllerBulletinsEntity;

View File

@ -41,6 +41,8 @@ export const setDisconnectionAcknowledged = createAction(
props<{ disconnectionAcknowledged: boolean }>()
);
export const resetConnectedStateChanged = createAction(`${CLUSTER_SUMMARY_STATE_PREFIX} Reset Connected State Changed`);
export const searchCluster = createAction(
`${CLUSTER_SUMMARY_STATE_PREFIX} Search Cluster`,
props<{ request: ClusterSearchRequest }>()

View File

@ -18,14 +18,17 @@
import { createReducer, on } from '@ngrx/store';
import { ClusterSummaryState } from './index';
import {
acknowledgeClusterConnectionChange,
loadClusterSummary,
loadClusterSummarySuccess,
resetConnectedStateChanged,
searchClusterSuccess,
setDisconnectionAcknowledged
} from './cluster-summary.actions';
export const initialState: ClusterSummaryState = {
disconnectionAcknowledged: false,
connectedStateChanged: false,
clusterSummary: null,
searchResults: null,
status: 'pending'
@ -46,6 +49,14 @@ export const clusterSummaryReducer = createReducer(
...state,
searchResults: response
})),
on(acknowledgeClusterConnectionChange, (state) => ({
...state,
connectedStateChanged: true
})),
on(resetConnectedStateChanged, (state) => ({
...state,
connectedStateChanged: false
})),
on(setDisconnectionAcknowledged, (state, { disconnectionAcknowledged }) => ({
...state,
disconnectionAcknowledged

View File

@ -25,6 +25,11 @@ export const selectDisconnectionAcknowledged = createSelector(
(state: ClusterSummaryState) => state.disconnectionAcknowledged
);
export const selectConnectedStateChanged = createSelector(
selectClusterSummaryState,
(state: ClusterSummaryState) => state.connectedStateChanged
);
export const selectClusterSummary = createSelector(
selectClusterSummaryState,
(state: ClusterSummaryState) => state.clusterSummary

View File

@ -44,6 +44,7 @@ export interface ClusterSearchResults {
export interface ClusterSummaryState {
disconnectionAcknowledged: boolean;
connectedStateChanged: boolean;
clusterSummary: ClusterSummary | null;
searchResults: ClusterSearchResults | null;
status: 'pending' | 'loading' | 'success';