NIFI-12485: Lineage Graph (#8173)

* NIFI-12485:
- Lineage.

* NIFI-12485:
- Adding context menu to lineage graph.
- Refactoring canvas context menu to promote reuse.

* NIFI-12485:
- Lineage timeline slider.

* NIFI-12485:
- Addressing review feedback.

* NIFI-12485:
- Addressing review feedback.

This closes #8173
This commit is contained in:
Matt Gilman 2023-12-21 11:10:11 -05:00 committed by GitHub
parent 02d563eefc
commit e6d09c3b3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 2230 additions and 338 deletions

View File

@ -1,5 +1,4 @@
export default { const target = {
'/nifi-api/*': {
target: 'https://localhost:8443', target: 'https://localhost:8443',
secure: false, secure: false,
logLevel: 'debug', logLevel: 'debug',
@ -7,33 +6,12 @@ export default {
headers: { headers: {
'X-ProxyPort': 4200 'X-ProxyPort': 4200
} }
}, };
'/nifi-docs/*': {
target: 'https://localhost:8443', export default {
secure: false, '/nifi-api/*': target,
logLevel: 'debug', '/nifi-docs/*': target,
changeOrigin: true, '/nifi-content-viewer/*': target,
headers: { // the following entry is needed because the content viewer (and other UIs) load resources from existing nifi ui
'X-ProxyPort': 4200 '/nifi/*': target
}
},
'/nifi-content-viewer/*': {
target: 'https://localhost:8443',
secure: false,
logLevel: 'debug',
changeOrigin: true,
headers: {
'X-ProxyPort': 4200
}
},
// the following entry is needed because the content viewer (and other UIs) load resources from existing nifi ui
'/nifi/*': {
target: 'https://localhost:8443',
secure: false,
logLevel: 'debug',
changeOrigin: true,
headers: {
'X-ProxyPort': 4200
}
}
}; };

View File

@ -15,10 +15,12 @@
* limitations under the License. * limitations under the License.
*/ */
import { Component, Input, OnInit, TemplateRef, ViewChild } from '@angular/core'; import { Injectable } from '@angular/core';
import { Observable, throwError } from 'rxjs';
import { HttpClient } from '@angular/common/http';
import { CanvasUtils } from './canvas-utils.service';
import { Store } from '@ngrx/store'; import { Store } from '@ngrx/store';
import { CanvasState } from '../../../state'; import { CanvasState } from '../state';
import { Observable, Subject } from 'rxjs';
import { import {
centerSelectedComponent, centerSelectedComponent,
deleteComponents, deleteComponents,
@ -33,35 +35,19 @@ import {
navigateToProvenanceForComponent, navigateToProvenanceForComponent,
reloadFlow, reloadFlow,
replayLastProvenanceEvent replayLastProvenanceEvent
} from '../../../state/flow/flow.actions'; } from '../state/flow/flow.actions';
import { CanvasUtils } from '../../../service/canvas-utils.service'; import { ComponentType } from '../../../state/shared';
import { DeleteComponentRequest, MoveComponentRequest } from '../../../state/flow'; import { DeleteComponentRequest, MoveComponentRequest } from '../state/flow';
import { ComponentType } from '../../../../../state/shared'; import {
import { AsyncPipe, NgForOf, NgIf } from '@angular/common'; ContextMenu,
import { CdkMenu, CdkMenuItem, CdkMenuTrigger } from '@angular/cdk/menu'; ContextMenuDefinition,
ContextMenuDefinitionProvider,
ContextMenuItemDefinition
} from '../../../ui/common/context-menu/context-menu.component';
import { selection } from 'd3';
export interface ContextMenuItemDefinition { @Injectable({ providedIn: 'root' })
isSeparator?: boolean; export class CanvasContextMenu implements ContextMenuDefinitionProvider {
condition?: Function;
clazz?: string;
text?: string;
subMenuId?: string;
action?: Function;
}
export interface ContextMenuDefinition {
id: string;
menuItems: ContextMenuItemDefinition[];
}
@Component({
selector: 'fd-context-menu',
standalone: true,
templateUrl: './context-menu.component.html',
imports: [NgForOf, AsyncPipe, CdkMenu, CdkMenuItem, NgIf, CdkMenuTrigger],
styleUrls: ['./context-menu.component.scss']
})
export class ContextMenu implements OnInit {
readonly VERSION_MENU = { readonly VERSION_MENU = {
id: 'version', id: 'version',
menuItems: [ menuItems: [
@ -966,12 +952,6 @@ export class ContextMenu implements OnInit {
private allMenus: Map<string, ContextMenuDefinition>; private allMenus: Map<string, ContextMenuDefinition>;
@Input() menuId: string | undefined;
@ViewChild('menu', { static: true }) menu!: TemplateRef<any>;
private showFocused: Subject<boolean> = new Subject();
showFocused$: Observable<boolean> = this.showFocused.asObservable();
constructor( constructor(
private store: Store<CanvasState>, private store: Store<CanvasState>,
private canvasUtils: CanvasUtils private canvasUtils: CanvasUtils
@ -985,79 +965,23 @@ export class ContextMenu implements OnInit {
this.allMenus.set(this.DOWNLOAD.id, this.DOWNLOAD); this.allMenus.set(this.DOWNLOAD.id, this.DOWNLOAD);
} }
getMenuItems(menuId: string | undefined): ContextMenuItemDefinition[] { getMenu(menuId: string): ContextMenuDefinition | undefined {
if (menuId) { return this.allMenus.get(menuId);
const menuDefinition: ContextMenuDefinition | undefined = this.allMenus.get(menuId); }
if (menuDefinition) { filterMenuItem(menuItem: ContextMenuItemDefinition): boolean {
const selection = this.canvasUtils.getSelection(); const selection = this.canvasUtils.getSelection();
// find all applicable menu items for the current selection
let applicableMenuItems = menuDefinition.menuItems.filter((menuItem: ContextMenuItemDefinition) => {
// include if the condition matches // include if the condition matches
if (menuItem.condition) { if (menuItem.condition) {
return menuItem.condition(this.canvasUtils, selection); return menuItem.condition(this.canvasUtils, selection);
} }
// include if the sub menu has items // include if there is no condition (non conditional item, separator, sub menu, etc)
if (menuItem.subMenuId) {
return this.getMenuItems(menuItem.subMenuId).length > 0;
}
return true;
});
// remove any extra separators
applicableMenuItems = applicableMenuItems.filter(
(menuItem: ContextMenuItemDefinition, index: number) => {
if (menuItem.isSeparator && index > 0) {
// cannot have two consecutive separators
return !applicableMenuItems[index - 1].isSeparator;
}
return true; return true;
} }
);
return applicableMenuItems.filter((menuItem: ContextMenuItemDefinition, index: number) => { menuItemClicked(menuItem: ContextMenuItemDefinition, event: MouseEvent): void {
if (menuItem.isSeparator) {
// a separator cannot be first
if (index === 0) {
return false;
}
// a separator cannot be last
if (index >= applicableMenuItems.length - 1) {
return false;
}
}
return true;
});
} else {
return [];
}
}
return [];
}
hasSubMenu(menuItemDefinition: ContextMenuItemDefinition): boolean {
return !!menuItemDefinition.subMenuId;
}
keydown(event: KeyboardEvent): void {
// TODO - Currently the first item in the context menu is auto focused. By default, this is rendered with an
// outline. This appears to be an issue with the cdkMenu/cdkMenuItem so we are working around it by manually
// overriding styles.
this.showFocused.next(true);
}
ngOnInit(): void {
this.showFocused.next(false);
}
menuItemClicked(menuItem: ContextMenuItemDefinition, event: MouseEvent) {
if (menuItem.action) { if (menuItem.action) {
const selection = this.canvasUtils.getSelection(); const selection = this.canvasUtils.getSelection();
menuItem.action(this.store, selection, this.canvasUtils, event); menuItem.action(this.store, selection, this.canvasUtils, event);

View File

@ -61,6 +61,8 @@ import {
Loading Flow Loading Flow
*/ */
export const resetState = createAction('[Canvas] Reset State');
export const reloadFlow = createAction('[Canvas] Reload Flow'); export const reloadFlow = createAction('[Canvas] Reload Flow');
export const leaveProcessGroup = createAction('[Canvas] Leave Process Group'); export const leaveProcessGroup = createAction('[Canvas] Leave Process Group');
@ -152,7 +154,7 @@ export const removeSelectedComponents = createAction(
props<{ request: SelectComponentsRequest }>() props<{ request: SelectComponentsRequest }>()
); );
export const centerSelectedComponent = createAction('[Canvas] Center Selected Components'); export const centerSelectedComponent = createAction('[Canvas] Center Selected Component');
/* /*
Create Component Actions Create Component Actions

View File

@ -67,7 +67,6 @@ import { CreatePort } from '../../ui/canvas/items/port/create-port/create-port.c
import { EditPort } from '../../ui/canvas/items/port/edit-port/edit-port.component'; import { EditPort } from '../../ui/canvas/items/port/edit-port/edit-port.component';
import { import {
ComponentType, ComponentType,
ControllerServiceReferencingComponent,
EditParameterRequest, EditParameterRequest,
EditParameterResponse, EditParameterResponse,
InlineServiceCreationRequest, InlineServiceCreationRequest,
@ -1135,13 +1134,14 @@ export class FlowEffects {
this.actions$.pipe( this.actions$.pipe(
ofType(FlowActions.openEditProcessGroupDialog), ofType(FlowActions.openEditProcessGroupDialog),
map((action) => action.request), map((action) => action.request),
switchMap((action) => withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, currentProcessGroupId]) =>
this.flowService.getParameterContexts().pipe( this.flowService.getParameterContexts().pipe(
take(1), take(1),
map((response) => [action, response.parameterContexts]) map((response) => [request, response.parameterContexts, currentProcessGroupId])
) )
), ),
tap(([request, parameterContexts]) => { tap(([request, parameterContexts, currentProcessGroupId]) => {
const editDialogReference = this.dialog.open(EditProcessGroup, { const editDialogReference = this.dialog.open(EditProcessGroup, {
data: request, data: request,
panelClass: 'large-dialog' panelClass: 'large-dialog'
@ -1167,6 +1167,15 @@ export class FlowEffects {
editDialogReference.afterClosed().subscribe(() => { editDialogReference.afterClosed().subscribe(() => {
this.store.dispatch(FlowActions.clearFlowApiError()); this.store.dispatch(FlowActions.clearFlowApiError());
if (request.entity.id === currentProcessGroupId) {
this.store.dispatch(
FlowActions.enterProcessGroup({
request: {
id: currentProcessGroupId
}
})
);
} else {
this.store.dispatch( this.store.dispatch(
FlowActions.selectComponents({ FlowActions.selectComponents({
request: { request: {
@ -1179,6 +1188,7 @@ export class FlowEffects {
} }
}) })
); );
}
}); });
}) })
), ),

View File

@ -37,6 +37,7 @@ import {
loadProcessorSuccess, loadProcessorSuccess,
loadRemoteProcessGroupSuccess, loadRemoteProcessGroupSuccess,
navigateWithoutTransform, navigateWithoutTransform,
resetState,
setDragging, setDragging,
setNavigationCollapsed, setNavigationCollapsed,
setOperationCollapsed, setOperationCollapsed,
@ -138,6 +139,9 @@ export const initialState: FlowState = {
export const flowReducer = createReducer( export const flowReducer = createReducer(
initialState, initialState,
on(resetState, (state) => ({
...initialState
})),
on(loadProcessGroup, (state, { request }) => ({ on(loadProcessGroup, (state, { request }) => ({
...state, ...state,
transitionRequired: request.transitionRequired, transitionRequired: request.transitionRequired,

View File

@ -113,7 +113,7 @@ export const selectSingleEditedComponent = createSelector(selectCurrentRoute, (r
export const selectEditedCurrentProcessGroup = createSelector(selectCurrentRoute, (route) => { export const selectEditedCurrentProcessGroup = createSelector(selectCurrentRoute, (route) => {
if (route?.routeConfig?.path == 'edit') { if (route?.routeConfig?.path == 'edit') {
if (route.params.ids == null && route.params.type == null) { if (route.params.ids == null && route.params.id == null && route.params.type == null) {
return route.params.processGroupId; return route.params.processGroupId;
} }
} }

View File

@ -19,7 +19,7 @@
<fd-header></fd-header> <fd-header></fd-header>
<div class="flex-1"> <div class="flex-1">
<div id="canvas-container" class="canvas-background h-full" [cdkContextMenuTriggerFor]="contextMenu.menu"></div> <div id="canvas-container" class="canvas-background h-full" [cdkContextMenuTriggerFor]="contextMenu.menu"></div>
<fd-context-menu #contextMenu menuId="root"></fd-context-menu> <fd-context-menu #contextMenu [menuProvider]="canvasContextMenu" menuId="root"></fd-context-menu>
<graph-controls></graph-controls> <graph-controls></graph-controls>
</div> </div>
<fd-footer></fd-footer> <fd-footer></fd-footer>

View File

@ -20,7 +20,7 @@ import { ComponentFixture, TestBed } from '@angular/core/testing';
import { Canvas } from './canvas.component'; import { Canvas } from './canvas.component';
import { provideMockStore } from '@ngrx/store/testing'; import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/flow/flow.reducer'; import { initialState } from '../../state/flow/flow.reducer';
import { ContextMenu } from './context-menu/context-menu.component'; import { ContextMenu } from '../../../../ui/common/context-menu/context-menu.component';
import { Component } from '@angular/core'; import { Component } from '@angular/core';
import { CdkContextMenuTrigger } from '@angular/cdk/menu'; import { CdkContextMenuTrigger } from '@angular/cdk/menu';
import { selectBreadcrumbs } from '../../state/flow/flow.selectors'; import { selectBreadcrumbs } from '../../state/flow/flow.selectors';

View File

@ -25,6 +25,7 @@ import {
editComponent, editComponent,
editCurrentProcessGroup, editCurrentProcessGroup,
loadProcessGroup, loadProcessGroup,
resetState,
selectComponents, selectComponents,
setSkipTransform, setSkipTransform,
startProcessGroupPolling, startProcessGroupPolling,
@ -58,6 +59,8 @@ import { filter, map, switchMap, take, withLatestFrom } from 'rxjs';
import { restoreViewport, zoomFit } from '../../state/transform/transform.actions'; import { restoreViewport, zoomFit } from '../../state/transform/transform.actions';
import { ComponentType } from '../../../../state/shared'; import { ComponentType } from '../../../../state/shared';
import { initialState } from '../../state/flow/flow.reducer'; import { initialState } from '../../state/flow/flow.reducer';
import { ContextMenuDefinitionProvider } from '../../../../ui/common/context-menu/context-menu.component';
import { CanvasContextMenu } from '../../service/canvas-context-menu.service';
@Component({ @Component({
selector: 'fd-canvas', selector: 'fd-canvas',
@ -74,7 +77,8 @@ export class Canvas implements OnInit, OnDestroy {
constructor( constructor(
private viewContainerRef: ViewContainerRef, private viewContainerRef: ViewContainerRef,
private store: Store<CanvasState>, private store: Store<CanvasState>,
private canvasView: CanvasView private canvasView: CanvasView,
public canvasContextMenu: CanvasContextMenu
) { ) {
this.store this.store
.select(selectTransform) .select(selectTransform)
@ -558,6 +562,7 @@ export class Canvas implements OnInit, OnDestroy {
} }
ngOnDestroy(): void { ngOnDestroy(): void {
this.store.dispatch(resetState());
this.store.dispatch(stopProcessGroupPolling()); this.store.dispatch(stopProcessGroupPolling());
} }
} }

View File

@ -18,7 +18,7 @@
import { NgModule } from '@angular/core'; import { NgModule } from '@angular/core';
import { CommonModule } from '@angular/common'; import { CommonModule } from '@angular/common';
import { Canvas } from './canvas.component'; import { Canvas } from './canvas.component';
import { ContextMenu } from './context-menu/context-menu.component'; import { ContextMenu } from '../../../../ui/common/context-menu/context-menu.component';
import { CdkContextMenuTrigger, CdkMenu, CdkMenuItem, CdkMenuTrigger } from '@angular/cdk/menu'; import { CdkContextMenuTrigger, CdkMenu, CdkMenuItem, CdkMenuTrigger } from '@angular/cdk/menu';
import { GraphControls } from './graph-controls/graph-controls.component'; import { GraphControls } from './graph-controls/graph-controls.component';
import { CanvasRoutingModule } from './canvas-routing.module'; import { CanvasRoutingModule } from './canvas-routing.module';

View File

@ -24,9 +24,6 @@ const routes: Routes = [
path: '', path: '',
component: Provenance, component: Provenance,
children: [ children: [
// {
// path: 'lineage'
// },
{ {
path: '', path: '',
loadChildren: () => loadChildren: () =>

View File

@ -24,6 +24,7 @@ import { ProvenanceRoutingModule } from './provenance-routing.module';
import { provenanceFeatureKey, reducers } from '../state'; import { provenanceFeatureKey, reducers } from '../state';
import { ProvenanceEventListingEffects } from '../state/provenance-event-listing/provenance-event-listing.effects'; import { ProvenanceEventListingEffects } from '../state/provenance-event-listing/provenance-event-listing.effects';
import { MatDialogModule } from '@angular/material/dialog'; import { MatDialogModule } from '@angular/material/dialog';
import { LineageEffects } from '../state/lineage/lineage.effects';
@NgModule({ @NgModule({
declarations: [Provenance], declarations: [Provenance],
@ -33,7 +34,7 @@ import { MatDialogModule } from '@angular/material/dialog';
MatDialogModule, MatDialogModule,
ProvenanceRoutingModule, ProvenanceRoutingModule,
StoreModule.forFeature(provenanceFeatureKey, reducers), StoreModule.forFeature(provenanceFeatureKey, reducers),
EffectsModule.forFeature(ProvenanceEventListingEffects) EffectsModule.forFeature(ProvenanceEventListingEffects, LineageEffects)
] ]
}) })
export class ProvenanceModule {} export class ProvenanceModule {}

View File

@ -20,6 +20,7 @@ import { Observable, throwError } from 'rxjs';
import { HttpClient } from '@angular/common/http'; import { HttpClient } from '@angular/common/http';
import { NiFiCommon } from '../../../service/nifi-common.service'; import { NiFiCommon } from '../../../service/nifi-common.service';
import { ProvenanceRequest } from '../state/provenance-event-listing'; import { ProvenanceRequest } from '../state/provenance-event-listing';
import { LineageRequest } from '../state/lineage';
@Injectable({ providedIn: 'root' }) @Injectable({ providedIn: 'root' })
export class ProvenanceService { export class ProvenanceService {
@ -127,4 +128,18 @@ export class ProvenanceService {
return this.httpClient.post(`${ProvenanceService.API}/provenance-events/replays`, payload); return this.httpClient.post(`${ProvenanceService.API}/provenance-events/replays`, payload);
} }
submitLineageQuery(request: LineageRequest): Observable<any> {
return this.httpClient.post(`${ProvenanceService.API}/provenance/lineage`, { lineage: { request } });
}
getLineageQuery(id: string, clusterNodeId?: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.get(`${ProvenanceService.API}/provenance/lineage/${encodeURIComponent(id)}`);
}
deleteLineageQuery(id: string, clusterNodeId?: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.delete(`${ProvenanceService.API}/provenance/lineage/${encodeURIComponent(id)}`);
}
} }

View File

@ -22,16 +22,20 @@
import { Action, combineReducers, createFeatureSelector } from '@ngrx/store'; import { Action, combineReducers, createFeatureSelector } from '@ngrx/store';
import { provenanceEventListingFeatureKey, ProvenanceEventListingState } from './provenance-event-listing'; import { provenanceEventListingFeatureKey, ProvenanceEventListingState } from './provenance-event-listing';
import { provenanceEventListingReducer } from './provenance-event-listing/provenance-event-listing.reducer'; import { provenanceEventListingReducer } from './provenance-event-listing/provenance-event-listing.reducer';
import { lineageFeatureKey, LineageState } from './lineage';
import { lineageReducer } from './lineage/lineage.reducer';
export const provenanceFeatureKey = 'provenance'; export const provenanceFeatureKey = 'provenance';
export interface ProvenanceState { export interface ProvenanceState {
[provenanceEventListingFeatureKey]: ProvenanceEventListingState; [provenanceEventListingFeatureKey]: ProvenanceEventListingState;
[lineageFeatureKey]: LineageState;
} }
export function reducers(state: ProvenanceState | undefined, action: Action) { export function reducers(state: ProvenanceState | undefined, action: Action) {
return combineReducers({ return combineReducers({
[provenanceEventListingFeatureKey]: provenanceEventListingReducer [provenanceEventListingFeatureKey]: provenanceEventListingReducer,
[lineageFeatureKey]: lineageReducer
})(state, action); })(state, action);
} }

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export const lineageFeatureKey = 'lineageGraph';
export interface LineageRequest {
eventId?: string;
lineageRequestType: 'PARENTS' | 'CHILDREN' | 'FLOWFILE';
uuid?: string;
clusterNodeId?: string;
}
export interface LineageQueryResponse {
lineage: Lineage;
}
export interface LineageNode {
id: string;
flowFileUuid: string;
parentUuids: string[];
childUuids: string[];
clusterNodeIdentifier: string;
type: string;
eventType: string;
millis: number;
timestamp: string;
}
export interface LineageLink {
sourceId: string;
targetId: string;
flowFileUuid: string;
timestamp: string;
millis: number;
}
export interface LineageResults {
errors?: string[];
nodes: LineageNode[];
links: LineageLink[];
}
export interface Lineage {
id: string;
uri: string;
submissionTime: string;
expiration: string;
percentCompleted: number;
finished: boolean;
request: LineageRequest;
results: LineageResults;
}
export interface LineageState {
lineage: Lineage | null;
error: string | null;
status: 'pending' | 'loading' | 'error' | 'success';
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createAction, props } from '@ngrx/store';
import { LineageQueryResponse, LineageRequest } from './index';
export const resetLineage = createAction('[Lineage] Reset Lineage');
export const submitLineageQuery = createAction('[Lineage] Submit Lineage Query', props<{ request: LineageRequest }>());
export const submitLineageQuerySuccess = createAction(
'[Lineage] Submit Lineage Query Success',
props<{ response: LineageQueryResponse }>()
);
export const startPollingLineageQuery = createAction('[Lineage] Start Polling Lineage Query');
export const pollLineageQuery = createAction('[Lineage] Poll Lineage Query');
export const pollLineageQuerySuccess = createAction(
'[Lineage] Poll Lineage Query Success',
props<{ response: LineageQueryResponse }>()
);
export const stopPollingLineageQuery = createAction('[Lineage] Stop Polling Lineage Query');
export const deleteLineageQuery = createAction('[Lineage] Delete Lineage Query');
export const lineageApiError = createAction(
'[Lineage] Load Parameter Context Listing Error',
props<{ error: string }>()
);

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import * as LineageActions from './lineage.actions';
import * as ProvenanceActions from '../provenance-event-listing/provenance-event-listing.actions';
import {
asyncScheduler,
catchError,
from,
interval,
map,
NEVER,
of,
switchMap,
take,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import { ProvenanceService } from '../../service/provenance.service';
import { Lineage } from './index';
import { selectClusterNodeId } from '../provenance-event-listing/provenance-event-listing.selectors';
import { selectLineageId } from './lineage.selectors';
@Injectable()
export class LineageEffects {
constructor(
private actions$: Actions,
private store: Store<NiFiState>,
private provenanceService: ProvenanceService,
private dialog: MatDialog
) {}
submitLineageQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.submitLineageQuery),
map((action) => action.request),
switchMap((request) =>
from(this.provenanceService.submitLineageQuery(request)).pipe(
map((response) =>
LineageActions.submitLineageQuerySuccess({
response: {
lineage: response.lineage
}
})
),
catchError((error) => {
this.store.dispatch(
ProvenanceActions.showOkDialog({
title: 'Error',
message: error.error
})
);
return of(
LineageActions.lineageApiError({
error: error.error
})
);
})
)
)
)
);
submitLineageQuerySuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.submitLineageQuerySuccess),
map((action) => action.response),
switchMap((response) => {
const query: Lineage = response.lineage;
if (query.finished) {
this.dialog.closeAll();
return of(LineageActions.deleteLineageQuery());
} else {
return of(LineageActions.startPollingLineageQuery());
}
})
)
);
startPollingLineageQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.startPollingLineageQuery),
switchMap(() =>
interval(2000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(LineageActions.stopPollingLineageQuery)))
)
),
switchMap(() => of(LineageActions.pollLineageQuery()))
)
);
pollLineageQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.pollLineageQuery),
withLatestFrom(this.store.select(selectLineageId), this.store.select(selectClusterNodeId)),
switchMap(([action, id, clusterNodeId]) => {
if (id) {
return from(this.provenanceService.getLineageQuery(id, clusterNodeId)).pipe(
map((response) =>
LineageActions.pollLineageQuerySuccess({
response: {
lineage: response.lineage
}
})
),
catchError((error) =>
of(
LineageActions.lineageApiError({
error: error.error
})
)
)
);
} else {
return NEVER;
}
})
)
);
pollLineageQuerySuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.pollLineageQuerySuccess),
map((action) => action.response),
switchMap((response) => {
const query: Lineage = response.lineage;
if (query.finished) {
this.dialog.closeAll();
return of(LineageActions.stopPollingLineageQuery());
} else {
return NEVER;
}
})
)
);
stopPollingLineageQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.stopPollingLineageQuery),
switchMap((response) => of(LineageActions.deleteLineageQuery()))
)
);
deleteLineageQuery$ = createEffect(
() =>
this.actions$.pipe(
ofType(LineageActions.deleteLineageQuery),
withLatestFrom(this.store.select(selectLineageId), this.store.select(selectClusterNodeId)),
tap(([action, id, clusterNodeId]) => {
if (id) {
this.provenanceService.deleteLineageQuery(id, clusterNodeId).subscribe();
}
})
),
{ dispatch: false }
);
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createReducer, on } from '@ngrx/store';
import { LineageState } from './index';
import {
lineageApiError,
pollLineageQuerySuccess,
resetLineage,
submitLineageQuery,
submitLineageQuerySuccess
} from './lineage.actions';
export const initialState: LineageState = {
lineage: null,
error: null,
status: 'pending'
};
export const lineageReducer = createReducer(
initialState,
on(resetLineage, (state) => ({
...initialState
})),
on(submitLineageQuery, (state) => ({
...state,
status: 'loading' as const
})),
on(submitLineageQuerySuccess, pollLineageQuerySuccess, (state, { response }) => ({
...state,
lineage: response.lineage,
error: null,
status: 'success' as const
})),
on(lineageApiError, (state, { error }) => ({
...state,
error,
status: 'error' as const
}))
);

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createSelector } from '@ngrx/store';
import { ProvenanceState, selectProvenanceState } from '../index';
import { Lineage, lineageFeatureKey, LineageState } from './index';
export const selectLineageState = createSelector(
selectProvenanceState,
(state: ProvenanceState) => state[lineageFeatureKey]
);
export const selectStatus = createSelector(selectLineageState, (state: LineageState) => state.status);
export const selectLineage = createSelector(selectLineageState, (state: LineageState) => state.lineage);
export const selectLineageId = createSelector(selectLineage, (state: Lineage | null) => state?.id);

View File

@ -32,6 +32,17 @@ export interface ProvenanceQueryResponse {
provenance: Provenance; provenance: Provenance;
} }
export interface ProvenanceEventRequest {
id: string;
clusterNodeId?: string;
}
export interface GoToProvenanceEventSourceRequest {
eventId?: string;
componentId?: string;
groupId?: string;
}
export interface SearchableField { export interface SearchableField {
field: string; field: string;
id: string; id: string;
@ -93,7 +104,6 @@ export interface ProvenanceEventListingState {
options: ProvenanceOptions | null; options: ProvenanceOptions | null;
request: ProvenanceRequest | null; request: ProvenanceRequest | null;
provenance: Provenance | null; provenance: Provenance | null;
saving: boolean;
loadedTimestamp: string; loadedTimestamp: string;
error: string | null; error: string | null;
status: 'pending' | 'loading' | 'error' | 'success'; status: 'pending' | 'loading' | 'error' | 'success';

View File

@ -16,7 +16,15 @@
*/ */
import { createAction, props } from '@ngrx/store'; import { createAction, props } from '@ngrx/store';
import { ProvenanceOptionsResponse, ProvenanceQueryResponse, ProvenanceRequest } from './index'; import {
GoToProvenanceEventSourceRequest,
ProvenanceEventRequest,
ProvenanceOptionsResponse,
ProvenanceQueryResponse,
ProvenanceRequest
} from './index';
export const resetProvenanceState = createAction('[Provenance Event Listing] Reset Provenance State');
export const loadProvenanceOptions = createAction('[Provenance Event Listing] Load Provenance Options'); export const loadProvenanceOptions = createAction('[Provenance Event Listing] Load Provenance Options');
@ -54,13 +62,18 @@ export const stopPollingProvenanceQuery = createAction('[Provenance Event Listin
export const deleteProvenanceQuery = createAction('[Provenance Event Listing] Delete Provenance Query'); export const deleteProvenanceQuery = createAction('[Provenance Event Listing] Delete Provenance Query');
export const provenanceApiError = createAction( export const provenanceApiError = createAction(
'[Provenance Event Listing] Load Parameter Context Listing Error', '[Provenance Event Listing] Provenance Api Error',
props<{ error: string }>() props<{ error: string }>()
); );
export const openProvenanceEventDialog = createAction( export const openProvenanceEventDialog = createAction(
'[Provenance Event Listing] Open Provenance Event Dialog', '[Provenance Event Listing] Open Provenance Event Dialog',
props<{ id: string }>() props<{ request: ProvenanceEventRequest }>()
);
export const goToProvenanceEventSource = createAction(
'[Provenance Event Listing] Go To Provenance Event Source',
props<{ request: GoToProvenanceEventSourceRequest }>()
); );
export const openSearchDialog = createAction('[Provenance Event Listing] Open Search Dialog'); export const openSearchDialog = createAction('[Provenance Event Listing] Open Search Dialog');

View File

@ -287,6 +287,7 @@ export class ProvenanceEventListingEffects {
() => () =>
this.actions$.pipe( this.actions$.pipe(
ofType(ProvenanceEventListingActions.openProvenanceEventDialog), ofType(ProvenanceEventListingActions.openProvenanceEventDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectAbout)), withLatestFrom(this.store.select(selectAbout)),
tap(([request, about]) => { tap(([request, about]) => {
this.provenanceService.getProvenanceEvent(request.id).subscribe({ this.provenanceService.getProvenanceEvent(request.id).subscribe({
@ -342,6 +343,25 @@ export class ProvenanceEventListingEffects {
{ dispatch: false } { dispatch: false }
); );
goToProvenanceEventSource$ = createEffect(
() =>
this.actions$.pipe(
ofType(ProvenanceEventListingActions.goToProvenanceEventSource),
map((action) => action.request),
tap((request) => {
if (request.eventId) {
this.provenanceService.getProvenanceEvent(request.eventId).subscribe((response) => {
const event: any = response.provenanceEvent;
this.router.navigate(this.getEventComponentLink(event.groupId, event.componentId));
});
} else if (request.groupId && request.componentId) {
this.router.navigate(this.getEventComponentLink(request.groupId, request.componentId));
}
})
),
{ dispatch: false }
);
showOkDialog$ = createEffect( showOkDialog$ = createEffect(
() => () =>
this.actions$.pipe( this.actions$.pipe(
@ -358,4 +378,20 @@ export class ProvenanceEventListingEffects {
), ),
{ dispatch: false } { dispatch: false }
); );
private getEventComponentLink(groupId: string, componentId: string): string[] {
let link: string[];
if (groupId == componentId) {
link = ['/process-groups', componentId];
} else if (componentId === 'Connection' || componentId === 'Load Balanced Connection') {
link = ['/process-groups', groupId, 'Connection', componentId];
} else if (componentId === 'Output Port') {
link = ['/process-groups', groupId, 'OutputPort', componentId];
} else {
link = ['/process-groups', groupId, 'Processor', componentId];
}
return link;
}
} }

View File

@ -22,6 +22,7 @@ import {
loadProvenanceOptionsSuccess, loadProvenanceOptionsSuccess,
pollProvenanceQuerySuccess, pollProvenanceQuerySuccess,
provenanceApiError, provenanceApiError,
resetProvenanceState,
saveProvenanceRequest, saveProvenanceRequest,
submitProvenanceQuery, submitProvenanceQuery,
submitProvenanceQuerySuccess submitProvenanceQuerySuccess
@ -31,7 +32,6 @@ export const initialState: ProvenanceEventListingState = {
options: null, options: null,
request: null, request: null,
provenance: null, provenance: null,
saving: false,
loadedTimestamp: '', loadedTimestamp: '',
error: null, error: null,
status: 'pending' status: 'pending'
@ -39,6 +39,9 @@ export const initialState: ProvenanceEventListingState = {
export const provenanceEventListingReducer = createReducer( export const provenanceEventListingReducer = createReducer(
initialState, initialState,
on(resetProvenanceState, (state) => ({
...initialState
})),
on(loadProvenanceOptionsSuccess, (state, { response }) => ({ on(loadProvenanceOptionsSuccess, (state, { response }) => ({
...state, ...state,
options: response.provenanceOptions options: response.provenanceOptions
@ -64,7 +67,6 @@ export const provenanceEventListingReducer = createReducer(
})), })),
on(provenanceApiError, (state, { error }) => ({ on(provenanceApiError, (state, { error }) => ({
...state, ...state,
saving: false,
error, error,
status: 'error' as const status: 'error' as const
})) }))

View File

@ -15,29 +15,28 @@
~ limitations under the License. ~ limitations under the License.
--> -->
<div class="flex flex-col h-full gap-y-2 text-sm" *ngIf="status$ | async; let status"> <div class="flex flex-col h-full text-sm" *ngIf="status$ | async; let status">
<div class="flex-1"> <div class="flex-1">
<ng-container *ngIf="provenance$ | async as provenance; else initialLoading"> <ng-container *ngIf="provenance$ | async as provenance; else initialLoading">
<provenance-event-table <provenance-event-table
[loading]="status === 'loading'"
[loadedTimestamp]="(loadedTimestamp$ | async)!"
[events]="provenance.results.provenanceEvents" [events]="provenance.results.provenanceEvents"
[oldestEventAvailable]="provenance.results.oldestEvent" [oldestEventAvailable]="provenance.results.oldestEvent"
[timeOffset]="provenance.results.timeOffset"
[resultsMessage]="getResultsMessage(provenance)" [resultsMessage]="getResultsMessage(provenance)"
[hasRequest]="hasRequest(provenance.request)" [hasRequest]="hasRequest(provenance.request)"
[lineage$]="lineage$"
(openSearchCriteria)="openSearchCriteria()" (openSearchCriteria)="openSearchCriteria()"
(openEventDialog)="openEventDialog($event)" (openEventDialog)="openEventDialog($event)"
(clearRequest)="clearRequest()"></provenance-event-table> (goToProvenanceEventSource)="goToEventSource($event)"
(resubmitProvenanceQuery)="resubmitProvenanceQuery()"
(clearRequest)="clearRequest()"
(queryLineage)="queryLineage($event)"
(resetLineage)="resetLineage()"></provenance-event-table>
</ng-container> </ng-container>
<ng-template #initialLoading> <ng-template #initialLoading>
<ngx-skeleton-loader count="3"></ngx-skeleton-loader> <ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</ng-template> </ng-template>
</div> </div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshParameterContextListing()">
<i class="fa fa-refresh" [class.fa-spin]="(status$ | async) === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div> </div>

View File

@ -15,11 +15,13 @@
* limitations under the License. * limitations under the License.
*/ */
import { Component } from '@angular/core'; import { Component, OnDestroy } from '@angular/core';
import { Store } from '@ngrx/store'; import { Store } from '@ngrx/store';
import { import {
GoToProvenanceEventSourceRequest,
Provenance, Provenance,
ProvenanceEventListingState, ProvenanceEventListingState,
ProvenanceEventRequest,
ProvenanceRequest, ProvenanceRequest,
ProvenanceResults ProvenanceResults
} from '../../state/provenance-event-listing'; } from '../../state/provenance-event-listing';
@ -34,25 +36,32 @@ import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { filter, map, take, tap } from 'rxjs'; import { filter, map, take, tap } from 'rxjs';
import { import {
clearProvenanceRequest, clearProvenanceRequest,
goToProvenanceEventSource,
openProvenanceEventDialog, openProvenanceEventDialog,
openSearchDialog, openSearchDialog,
resetProvenanceState,
resubmitProvenanceQuery, resubmitProvenanceQuery,
saveProvenanceRequest saveProvenanceRequest
} from '../../state/provenance-event-listing/provenance-event-listing.actions'; } from '../../state/provenance-event-listing/provenance-event-listing.actions';
import { ProvenanceSearchDialog } from './provenance-search-dialog/provenance-search-dialog.component'; import { ProvenanceSearchDialog } from './provenance-search-dialog/provenance-search-dialog.component';
import { ProvenanceEventSummary } from '../../../../state/shared'; import { ProvenanceEventSummary } from '../../../../state/shared';
import { resetLineage, submitLineageQuery } from '../../state/lineage/lineage.actions';
import { LineageRequest } from '../../state/lineage';
import { selectLineage } from '../../state/lineage/lineage.selectors';
@Component({ @Component({
selector: 'provenance-event-listing', selector: 'provenance-event-listing',
templateUrl: './provenance-event-listing.component.html', templateUrl: './provenance-event-listing.component.html',
styleUrls: ['./provenance-event-listing.component.scss'] styleUrls: ['./provenance-event-listing.component.scss']
}) })
export class ProvenanceEventListing { export class ProvenanceEventListing implements OnDestroy {
status$ = this.store.select(selectStatus); status$ = this.store.select(selectStatus);
loadedTimestamp$ = this.store.select(selectLoadedTimestamp); loadedTimestamp$ = this.store.select(selectLoadedTimestamp);
provenance$ = this.store.select(selectProvenance); provenance$ = this.store.select(selectProvenance);
lineage$ = this.store.select(selectLineage);
request!: ProvenanceRequest; request!: ProvenanceRequest;
stateReset: boolean = false;
constructor(private store: Store<ProvenanceEventListingState>) { constructor(private store: Store<ProvenanceEventListingState>) {
this.store this.store
@ -115,6 +124,7 @@ export class ProvenanceEventListing {
return initialRequest; return initialRequest;
}), }),
filter(() => !this.stateReset),
tap((request) => (this.request = request)), tap((request) => (this.request = request)),
takeUntilDestroyed() takeUntilDestroyed()
) )
@ -159,19 +169,45 @@ export class ProvenanceEventListing {
this.store.dispatch(openSearchDialog()); this.store.dispatch(openSearchDialog());
} }
openEventDialog(event: ProvenanceEventSummary): void { openEventDialog(request: ProvenanceEventRequest): void {
this.store.dispatch( this.store.dispatch(
openProvenanceEventDialog({ openProvenanceEventDialog({
id: event.id request
}) })
); );
} }
refreshParameterContextListing(): void { goToEventSource(request: GoToProvenanceEventSourceRequest): void {
this.store.dispatch(
goToProvenanceEventSource({
request
})
);
}
resubmitProvenanceQuery(): void {
this.store.dispatch( this.store.dispatch(
resubmitProvenanceQuery({ resubmitProvenanceQuery({
request: this.request request: this.request
}) })
); );
} }
queryLineage(request: LineageRequest): void {
this.store.dispatch(
submitLineageQuery({
request
})
);
}
resetLineage(): void {
this.store.dispatch(resetLineage());
}
ngOnDestroy(): void {
this.stateReset = true;
this.store.dispatch(resetProvenanceState());
this.store.dispatch(resetLineage());
}
} }

View File

@ -0,0 +1,19 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div id="lineage" [cdkContextMenuTriggerFor]="contextMenu.menu"></div>
<fd-context-menu #contextMenu [menuProvider]="lineageContextmenu" menuId="root"></fd-context-menu>

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
:host ::ng-deep #lineage {
width: 100%;
height: 100%;
canvas,
svg {
position: absolute;
overflow: hidden;
path.link.selected {
stroke: #ba554a;
}
g.event {
cursor: pointer;
user-select: none;
}
g.event circle.selected {
fill: #ba554a;
}
g.event circle.context {
fill: #cf9f5d;
}
text.event-type {
font-family: Roboto;
font-size: 11px;
font-style: normal;
font-weight: normal;
}
text.event-type.expand-parents,
text.event-type.expand-children {
font-weight: 500;
font-family: 'Roboto';
font-style: normal;
font-size: 13px;
}
g.flowfile {
cursor: default;
user-select: none;
}
g.flowfile circle.context,
g.event circle.context {
stroke: #004849;
stroke-width: 1.5px;
}
.flowfile-icon {
font-family: flowfont;
content: '\e808';
font-size: 16px;
color: #ad9897;
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { LineageComponent } from './lineage.component';
import SpyObj = jasmine.SpyObj;
import createSpyObj = jasmine.createSpyObj;
describe('LineageComponent', () => {
let component: LineageComponent;
let fixture: ComponentFixture<LineageComponent>;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [LineageComponent]
});
fixture = TestBed.createComponent(LineageComponent);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,997 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, DestroyRef, EventEmitter, inject, Input, OnInit, Output } from '@angular/core';
import * as d3 from 'd3';
import { Lineage, LineageLink, LineageNode, LineageRequest } from '../../../../state/lineage';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { GoToProvenanceEventSourceRequest, ProvenanceEventRequest } from '../../../../state/provenance-event-listing';
import {
ContextMenu,
ContextMenuDefinition,
ContextMenuDefinitionProvider,
ContextMenuItemDefinition
} from '../../../../../../ui/common/context-menu/context-menu.component';
import { CdkContextMenuTrigger } from '@angular/cdk/menu';
@Component({
selector: 'lineage',
standalone: true,
templateUrl: './lineage.component.html',
imports: [ContextMenu, CdkContextMenuTrigger],
styleUrls: ['./lineage.component.scss']
})
export class LineageComponent implements OnInit {
private static readonly DEFAULT_NODE_SPACING: number = 100;
private static readonly DEFAULT_LEVEL_DIFFERENCE: number = 120;
private destroyRef = inject(DestroyRef);
@Input() set lineage(lineage: Lineage) {
if (lineage && lineage.finished) {
this.addLineage(lineage.results.nodes, lineage.results.links);
}
}
@Input() eventId: string | null = null;
@Input() set eventTimestampThreshold(eventTimestampThreshold: number) {
if (this.previousEventTimestampThreshold >= 0) {
let nodes: any = this.lineageContainerElement.selectAll('g.node.rendered');
let links: any = this.lineageContainerElement.selectAll('path.link.rendered');
if (this.previousEventTimestampThreshold > eventTimestampThreshold) {
// the threshold is descending
// determine the nodes to hide
const nodesToHide = nodes.filter((d: any) => {
return d.millis > eventTimestampThreshold && d.millis <= this.previousEventTimestampThreshold;
});
const linksToHide = links.filter((d: any) => {
return d.millis > eventTimestampThreshold && d.millis <= this.previousEventTimestampThreshold;
});
// hide applicable nodes and lines
nodesToHide.transition().delay(200).duration(400).style('opacity', 0).attr('pointer-events', 'none');
linksToHide.transition().duration(400).style('opacity', 0).attr('pointer-events', 'none');
} else {
// the threshold is ascending
// determine the nodes to show
const nodesToShow = nodes.filter((d: any) => {
return d.millis <= eventTimestampThreshold && d.millis > this.previousEventTimestampThreshold;
});
const linksToShow = links.filter((d: any) => {
return d.millis <= eventTimestampThreshold && d.millis > this.previousEventTimestampThreshold;
});
// show applicable nodes and lines
linksToShow.transition().delay(200).duration(400).style('opacity', 1).attr('pointer-events', 'all');
nodesToShow.transition().duration(400).style('opacity', 1).attr('pointer-events', 'all');
}
}
this.previousEventTimestampThreshold = eventTimestampThreshold;
}
@Input() set reset(reset: EventEmitter<void>) {
reset.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(() => {
this.nodeLookup.clear();
this.linkLookup.clear();
let nodes: any = this.lineageContainerElement.selectAll('g.node');
let links: any = this.lineageContainerElement.selectAll('path.link');
nodes = nodes.data(this.nodeLookup.values(), function (d: any) {
return d.id;
});
links = links.data(this.linkLookup.values(), function (d: any) {
return d.id;
});
nodes.exit().remove();
links.exit().remove();
});
}
@Output() submitLineageQuery: EventEmitter<LineageRequest> = new EventEmitter<LineageRequest>();
@Output() openEventDialog: EventEmitter<ProvenanceEventRequest> = new EventEmitter<ProvenanceEventRequest>();
@Output() goToProvenanceEventSource: EventEmitter<GoToProvenanceEventSourceRequest> =
new EventEmitter<GoToProvenanceEventSourceRequest>();
@Output() closeLineage: EventEmitter<void> = new EventEmitter<void>();
readonly ROOT_MENU: ContextMenuDefinition = {
id: 'root',
menuItems: [
{
condition: (selection: any) => {
return selection.empty();
},
clazz: 'fa fa-long-arrow-left',
text: 'Back to events',
action: (selection: any) => {
this.closeLineage.next();
}
},
{
condition: (selection: any) => {
return !selection.empty();
},
clazz: 'fa fa-info-circle',
text: 'View details',
action: (selection: any) => {
const selectionData: any = selection.datum();
// TODO cluster node id
this.openEventDialog.next({
id: selectionData.id
});
}
},
{
condition: (selection: any) => {
return !selection.empty();
},
clazz: 'fa fa-long-arrow-right',
text: 'Go to component',
action: (selection: any) => {
const selectionData: any = selection.datum();
this.goToProvenanceEventSource.next({
eventId: selectionData.id
});
}
},
{
condition: (selection: any) => {
if (selection.empty()) {
return false;
}
const selectionData: any = selection.datum();
return this.supportsExpandCollapse(selectionData);
},
clazz: 'fa fa-binoculars',
text: 'Find parents',
action: (selection: any) => {
const selectionData: any = selection.datum();
// TODO - cluster node id
this.submitLineageQuery.next({
lineageRequestType: 'PARENTS',
eventId: selectionData.id
// clusterNodeId: clusterNodeId
});
}
},
{
condition: (selection: any) => {
if (selection.empty()) {
return false;
}
const selectionData: any = selection.datum();
return this.supportsExpandCollapse(selectionData);
},
clazz: 'fa fa-plus-square',
text: 'Expand',
action: (selection: any) => {
const selectionData: any = selection.datum();
// TODO - cluster node id
this.submitLineageQuery.next({
lineageRequestType: 'CHILDREN',
eventId: selectionData.id
// clusterNodeId: clusterNodeId
});
}
},
{
condition: (selection: any) => {
if (selection.empty()) {
return false;
}
const selectionData: any = selection.datum();
return this.supportsExpandCollapse(selectionData);
},
clazz: 'fa fa-minus-square',
text: 'Collapse',
action: (selection: any) => {
const selectionData: any = selection.datum();
this.collapseLineage(selectionData);
}
}
]
};
private allMenus: Map<string, ContextMenuDefinition>;
lineageElement: any;
lineageContainerElement: any;
lineageContextmenu: ContextMenuDefinitionProvider;
private nodeLookup: Map<string, any> = new Map<string, any>();
private linkLookup: Map<string, any> = new Map<string, any>();
private previousEventTimestampThreshold: number = -1;
constructor() {
this.allMenus = new Map<string, ContextMenuDefinition>();
this.allMenus.set(this.ROOT_MENU.id, this.ROOT_MENU);
const self: LineageComponent = this;
this.lineageContextmenu = {
getMenu(menuId: string): ContextMenuDefinition | undefined {
return self.allMenus.get(menuId);
},
filterMenuItem(menuItem: ContextMenuItemDefinition): boolean {
// include if the condition matches
if (menuItem.condition) {
const selection: any = d3.select('circle.context');
return menuItem.condition(selection);
}
// include if there is no condition (non conditional item, separator, sub menu, etc)
return true;
},
menuItemClicked(menuItem: ContextMenuItemDefinition, event: MouseEvent) {
if (menuItem.action) {
const selection: any = d3.select('circle.context');
return menuItem.action(selection);
}
}
};
}
ngOnInit(): void {
const self: LineageComponent = this;
this.lineageElement = document.getElementById('lineage');
// handle zoom behavior
const lineageZoom: any = d3
.zoom()
.scaleExtent([0.2, 8])
.on('zoom', function (event) {
d3.select('g.lineage').attr('transform', function () {
return `translate(${event.transform.x}, ${event.transform.y}) scale(${event.transform.k})`;
});
});
// build the birdseye svg
const svg = d3
.select(this.lineageElement)
.append('svg')
.attr('width', '100%')
.attr('height', '100%')
.call(lineageZoom)
.on('dblclick.zoom', null);
svg.append('rect')
.attr('width', '100%')
.attr('height', '100%')
.attr('fill', '#f9fafb')
.on('mousedown', function (event, d) {
// hide the context menu if necessary
self.clearSelectionContext();
// prevents browser from using text cursor
event.preventDefault();
});
svg.append('defs')
.selectAll('marker')
.data(['FLOWFILE', 'FLOWFILE-SELECTED', 'EVENT', 'EVENT-SELECTED'])
.enter()
.append('marker')
.attr('id', function (d) {
return d;
})
.attr('viewBox', '0 -3 6 6')
.attr('refX', function (d) {
if (d.indexOf('FLOWFILE') >= 0) {
return 16;
} else {
return 11;
}
})
.attr('refY', 0)
.attr('markerWidth', 6)
.attr('markerHeight', 6)
.attr('orient', 'auto')
.attr('fill', function (d) {
if (d.indexOf('SELECTED') >= 0) {
return '#ba554a';
} else {
return '#000000';
}
})
.append('path')
.attr('d', 'M0,-3 L6,0 L0,3');
// group everything together
this.lineageContainerElement = svg
.append('g')
.attr('transform', 'translate(0, 0) scale(1)')
.attr('pointer-events', 'all')
.attr('class', 'lineage');
}
private supportsExpandCollapse(d: any): boolean {
return (
d.eventType === 'SPAWN' ||
d.eventType === 'CLONE' ||
d.eventType === 'FORK' ||
d.eventType === 'JOIN' ||
d.eventType === 'REPLAY'
);
}
private locateDescendants(nodeIds: string[], descendants: Set<string>, depth?: number): void {
nodeIds.forEach((nodeId) => {
const node: any = this.nodeLookup.get(nodeId);
const children: string[] = [];
node.outgoing.forEach((link: any) => {
children.push(link.target.id);
descendants.add(link.target.id);
});
if (depth == null) {
this.locateDescendants(children, descendants);
} else if (depth > 1) {
this.locateDescendants(children, descendants, depth - 1);
}
});
}
private positionNodes(nodeIds: string[], depth: number, parents: string[], levelDifference: number): void {
const { width } = this.lineageElement.getBoundingClientRect();
const immediateSet: Set<string> = new Set(nodeIds);
const childSet: Set<string> = new Set();
const descendantSet: Set<string> = new Set();
// locate children
this.locateDescendants(nodeIds, childSet, 1);
// locate all descendants (including children)
this.locateDescendants(nodeIds, descendantSet);
// push off processing a node until its deepest point
// by removing any descendants from the immediate nodes.
// in this case, a link is panning multiple levels
descendantSet.forEach(function (d) {
immediateSet.delete(d);
});
// convert the children to an array to ensure consistent
// order when performing index of checks below
const children: string[] = Array.from(childSet.values()).sort(d3.descending);
// convert the immediate to allow for sorting below
let immediate: string[] = Array.from(immediateSet.values());
// attempt to identify fan in/out cases
let nodesWithTwoParents: number = 0;
immediate.forEach((nodeId) => {
const node: any = this.nodeLookup.get(nodeId);
// identify fanning cases
if (node.incoming.length > 3) {
levelDifference = LineageComponent.DEFAULT_LEVEL_DIFFERENCE;
} else if (node.incoming.length >= 2) {
nodesWithTwoParents++;
}
});
// increase the level difference if more than two nodes have two or more parents
if (nodesWithTwoParents > 2) {
levelDifference = LineageComponent.DEFAULT_LEVEL_DIFFERENCE;
}
// attempt to sort the nodes to provide an optimum layout
if (parents.length === 1) {
immediate = immediate.sort((one: string, two: string) => {
const oneNode: any = this.nodeLookup.get(one);
const twoNode: any = this.nodeLookup.get(two);
// try to order by children
if (oneNode.outgoing.length > 0 && twoNode.outgoing.length > 0) {
const oneIndex: number = children.indexOf(oneNode.outgoing[0].target.id);
const twoIndex: number = children.indexOf(twoNode.outgoing[0].target.id);
if (oneIndex !== twoIndex) {
return oneIndex - twoIndex;
}
}
// try to order by parents
if (oneNode.incoming.length > 0 && twoNode.incoming.length > 0) {
const oneIndex: number = oneNode.incoming[0].source.index;
const twoIndex: number = twoNode.incoming[0].source.index;
if (oneIndex !== twoIndex) {
return oneIndex - twoIndex;
}
}
// type of node
if (oneNode.type !== twoNode.type) {
return oneNode.type > twoNode.type ? 1 : -1;
}
// type of event
if (oneNode.eventType !== twoNode.eventType) {
return oneNode.eventType > twoNode.eventType ? 1 : -1;
}
// timestamp
return oneNode.millis - twoNode.millis;
});
} else if (parents.length > 1) {
immediate = immediate.sort((one: string, two: string) => {
const oneNode: any = this.nodeLookup.get(one);
const twoNode: any = this.nodeLookup.get(two);
// try to order by parents
if (oneNode.incoming.length > 0 && twoNode.incoming.length > 0) {
const oneIndex: number = oneNode.incoming[0].source.index;
const twoIndex: number = twoNode.incoming[0].source.index;
if (oneIndex !== twoIndex) {
return oneIndex - twoIndex;
}
}
// try to order by children
if (oneNode.outgoing.length > 0 && twoNode.outgoing.length > 0) {
const oneIndex: number = children.indexOf(oneNode.outgoing[0].target.id);
const twoIndex: number = children.indexOf(twoNode.outgoing[0].target.id);
if (oneIndex !== twoIndex) {
return oneIndex - twoIndex;
}
}
// node type
if (oneNode.type !== twoNode.type) {
return oneNode.type > twoNode.type ? 1 : -1;
}
// event type
if (oneNode.eventType !== twoNode.eventType) {
return oneNode.eventType > twoNode.eventType ? 1 : -1;
}
// timestamp
return oneNode.millis - twoNode.millis;
});
}
let originX: number = width / 2;
if (parents.length > 0) {
const meanParentX: number | undefined = d3.mean(parents, (parentId: string) => {
const parent = this.nodeLookup.get(parentId);
return parent ? parent.x : undefined;
});
if (meanParentX) {
originX = meanParentX;
}
}
const depthWidth: number = (immediate.length - 1) * LineageComponent.DEFAULT_NODE_SPACING;
immediate.forEach((nodeId: string, i: number) => {
const node: any = this.nodeLookup.get(nodeId);
// set the y position based on the depth
node.y = levelDifference + depth - 25;
// ensure the children won't position on top of one another
// based on the number of parent nodes
if (immediate.length <= parents.length) {
if (node.incoming.length === 1) {
const parent: any = node.incoming[0].source;
if (parent.outgoing.length === 1) {
node.x = parent.x;
return;
}
} else if (node.incoming.length > 1) {
const nodesOnPreviousLevel: any = node.incoming.filter((link: any) => {
return node.y - link.source.y <= LineageComponent.DEFAULT_LEVEL_DIFFERENCE;
});
node.x = d3.mean(nodesOnPreviousLevel, function (link: any) {
return link.source.x;
});
return;
}
}
// evenly space the nodes under the origin
node.x = i * LineageComponent.DEFAULT_NODE_SPACING + originX - depthWidth / 2;
});
// sort the immediate nodes after positioning by the x coordinate
// so they can be shifted accordingly if necessary
const sortedImmediate: string[] = immediate.slice().sort((one: string, two: string) => {
const nodeOne: any = this.nodeLookup.get(one);
const nodeTwo: any = this.nodeLookup.get(two);
return nodeOne.x - nodeTwo.x;
});
// adjust the x positioning if necessary to avoid positioning on top
// of one another, only need to consider the x coordinate since the
// y coordinate will be the same for each node on this row
for (let i = 0; i < sortedImmediate.length - 1; i++) {
const first: any = this.nodeLookup.get(sortedImmediate[i]);
const second: any = this.nodeLookup.get(sortedImmediate[i + 1]);
const difference: number = second.x - first.x;
if (difference < LineageComponent.DEFAULT_NODE_SPACING) {
second.x += LineageComponent.DEFAULT_NODE_SPACING - difference;
}
}
// if there are children to position
if (children.length > 0) {
let childLevelDifference: number = LineageComponent.DEFAULT_LEVEL_DIFFERENCE / 3;
// resort the immediate values after each node has been positioned
immediate = immediate.sort((one, two) => {
const oneNode: any = this.nodeLookup.get(one);
const twoNode: any = this.nodeLookup.get(two);
return oneNode.x - twoNode.x;
});
// mark each nodes index so subsequent recursive calls can position children accordingly
let nodesWithTwoChildren: number = 0;
immediate.forEach((nodeId: string, i: number) => {
const node: any = this.nodeLookup.get(nodeId);
node.index = i;
// precompute the next level difference since we have easy access to going here
if (node.outgoing.length > 3) {
childLevelDifference = LineageComponent.DEFAULT_LEVEL_DIFFERENCE;
} else if (node.outgoing.length >= 2) {
nodesWithTwoChildren++;
}
});
// if there are at least two immediate nodes with two or more children, increase the level difference
if (nodesWithTwoChildren > 2) {
childLevelDifference = LineageComponent.DEFAULT_LEVEL_DIFFERENCE;
}
// position the children
this.positionNodes(children, levelDifference + depth, immediate, childLevelDifference);
}
}
private addLineage(nodes: LineageNode[], links: LineageLink[]): void {
// add the new nodes
nodes.forEach((node) => {
if (this.nodeLookup.has(node.id)) {
return;
}
// add values to the node to support rendering
this.nodeLookup.set(node.id, {
...node,
x: 0,
y: 0,
visible: true
});
});
// add the new links
links.forEach((link) => {
const linkId: string = `${link.sourceId}-${link.targetId}`;
// create the link object
this.linkLookup.set(linkId, {
id: linkId,
source: this.nodeLookup.get(link.sourceId),
target: this.nodeLookup.get(link.targetId),
flowFileUuid: link.flowFileUuid,
millis: link.millis,
visible: true
});
});
this.refresh();
}
private refresh(): void {
// consider all nodes as starting points
const startNodes: Set<string> = new Set(this.nodeLookup.keys());
// go through the nodes to reset their outgoing links
this.nodeLookup.forEach(function (node, id) {
node.outgoing = [];
node.incoming = [];
});
// go through the links in order to compute the new layout
this.linkLookup.forEach(function (link, id) {
// updating the nodes connections
link.source.outgoing.push(link);
link.target.incoming.push(link);
// remove the target from being a potential starting node
startNodes.delete(link.target.id);
});
// position the nodes
this.positionNodes(Array.from(startNodes.values()), 1, [], 50);
// update the layout
this.update();
}
private collapseLineage(d: any): void {
const eventId: string = d.id;
const eventUuid: string = d.flowFileUuid;
const eventChildUuids: string[] = d.childUuids;
const fanIn: boolean = eventChildUuids.includes(eventUuid);
// determines if the specified event should be removable based on if the collapsing is fanning in/out
const allowEventRemoval = (node: any): boolean => {
if (fanIn) {
return node.id !== eventId;
} else {
return node.flowFileUuid !== eventUuid && !node.parentUuids?.includes(eventUuid);
}
};
// determines if the specified link should be removable based on if the collapsing is fanning in/out
const allowLinkRemoval = (link: any): boolean => {
if (fanIn) {
return true;
} else {
return link.flowFileUuid !== eventUuid;
}
};
// collapses the specified uuids
const collapse = (uuids: string[]): void => {
const newUuids: string[] = [];
// consider each node for being collapsed
this.nodeLookup.forEach((node) => {
// if this node is in the uuids remove it unless it's the original event or is part of this and another lineage
if (uuids.includes(node.flowFileUuid) && allowEventRemoval(node)) {
// remove it from the look lookup
this.nodeLookup.delete(node.id);
// include all related outgoing flow file uuids
node.outgoing.forEach((outgoing: any) => {
if (!uuids.includes(outgoing.flowFileUuid)) {
newUuids.push(outgoing.flowFileUuid);
}
});
}
});
// update the link data
this.linkLookup.forEach((link) => {
// if this link is in the uuids remove it
if (uuids.includes(link.flowFileUuid) && allowLinkRemoval(link)) {
// remove it from the link lookup
this.linkLookup.delete(link.id);
// add a related uuid that needs to be collapse
const next = link.target;
if (!uuids.includes(next.flowFileUuid)) {
newUuids.push(next.flowFileUuid);
}
}
});
// collapse any related uuids
if (newUuids.length > 0) {
collapse(newUuids);
}
};
// collapse the specified uuids
collapse(eventChildUuids);
// update the layout
this.refresh();
}
private clearSelectionContext(): void {
d3.selectAll('circle.context').classed('context', false);
}
private renderFlowFile(flowfiles: any): void {
const self: LineageComponent = this;
flowfiles.classed('flowfile', true).on('mousedown', function (event: MouseEvent, d: any) {
self.clearSelectionContext();
event.stopPropagation();
});
// node
flowfiles
.append('circle')
.attr('r', 16)
.attr('fill', '#fff')
.attr('stroke', '#000')
.attr('stroke-width', 1.0)
.on('mouseover', function (event: MouseEvent, d: any) {
self.lineageContainerElement
.selectAll('path.link')
.filter(function (linkDatum: any) {
return d.id === linkDatum.flowFileUuid;
})
.classed('selected', true)
.attr('marker-end', function (d: any) {
return `url(#${d.target.type}-SELECTED)`;
});
})
.on('mouseout', function (event: MouseEvent, d: any) {
self.lineageContainerElement
.selectAll('path.link')
.filter(function (linkDatum: any) {
return d.id === linkDatum.flowFileUuid;
})
.classed('selected', false)
.attr('marker-end', function (d: any) {
return `url(#${d.target.type})`;
});
});
flowfiles
.append('g')
.attr('class', 'flowfile-icon')
.attr('transform', function () {
return 'translate(-9,-9)';
})
.append('text')
.attr('font-family', 'flowfont')
.attr('font-size', '18px')
.attr('fill', '#ad9897')
.attr('transform', function () {
return 'translate(0,15)';
})
.on('mouseover', function (event: MouseEvent, d: any) {
self.lineageContainerElement
.selectAll('path.link')
.filter(function (linkDatum: any) {
return d.id === linkDatum.flowFileUuid;
})
.classed('selected', true)
.attr('marker-end', function (d: any) {
return `url(#${d.target.type}-SELECTED)`;
});
})
.on('mouseout', function (event: MouseEvent, d: any) {
self.lineageContainerElement
.selectAll('path.link')
.filter(function (linkDatum: any) {
return d.id === linkDatum.flowFileUuid;
})
.classed('selected', false)
.attr('marker-end', function (d: any) {
return `url(#${d.target.type})`;
});
})
.text(function () {
return '\ue808';
});
}
private renderEvent(events: any): void {
const self: LineageComponent = this;
events
.on('mousedown', function (event: MouseEvent, d: any) {
self.clearSelectionContext();
d3.select(`#event-node-${d.id}`).classed('context', true);
event.stopPropagation();
})
.on('dblclick', function (event: MouseEvent, d: any) {
// show the event details
// TODO - cluster node id
self.openEventDialog.next({
id: d.id
});
});
events
.classed('event', true)
// join node to its label
.append('rect')
.attr('x', 0)
.attr('y', -8)
.attr('height', 16)
.attr('width', 1)
.attr('opacity', 0)
.attr('id', function (d: any) {
return `event-filler-${d.id}`;
});
events
.append('circle')
.classed('selected', function (d: any) {
return d.id === self.eventId;
})
.attr('r', 8)
.attr('fill', '#aabbc3')
.attr('stroke', '#000')
.attr('stroke-width', 1.0)
.attr('id', function (d: any) {
return `event-node-${d.id}`;
});
events
.append('text')
.attr('id', function (d: any) {
return `event-text-${d.id}`;
})
.attr('class', 'event-type')
.classed('expand-parents', function (d: any) {
return d.eventType === 'SPAWN';
})
.classed('expand-children', function (d: any) {
return d.eventType === 'SPAWN';
})
.each(function (this: any, d: any) {
const label: any = d3.select(this);
if (d.eventType === 'CONTENT_MODIFIED' || d.eventType === 'ATTRIBUTES_MODIFIED') {
const lines: string[] = [];
if (d.eventType === 'CONTENT_MODIFIED') {
lines.push('CONTENT');
} else {
lines.push('ATTRIBUTES');
}
lines.push('MODIFIED');
// append each line
lines.forEach((line) => {
label
.append('tspan')
.attr('x', '0')
.attr('dy', '1.2em')
.text(function () {
return line;
});
});
label.attr('transform', 'translate(10,-14)');
} else {
label.text(d.eventType).attr('x', 10).attr('y', 4);
}
});
}
private update(): void {
const { width } = this.lineageElement.getBoundingClientRect();
// select the nodes
const nodeSelection: any = this.lineageContainerElement
.selectAll('g.node')
.data(this.nodeLookup.values(), function (d: any) {
return d.id;
});
// enter
const nodesEntered: any = nodeSelection
.enter()
.append('g')
.attr('id', function (d: any) {
return `lineage-group-${d.id}`;
})
.classed('node', true)
.attr('transform', function (d: any) {
if (d.incoming.length === 0) {
return `translate(${width / 2},50)`;
} else {
return `translate(${d.incoming[0].source.x},${d.incoming[0].source.y})`;
}
})
.style('opacity', 0);
// treat flowfiles and events differently
this.renderFlowFile(
nodesEntered.filter(function (d: any) {
return d.type === 'FLOWFILE';
})
);
this.renderEvent(
nodesEntered.filter(function (d: any) {
return d.type === 'EVENT';
})
);
// merge
const nodesUpdated = nodeSelection.merge(nodesEntered);
// update the nodes
nodesUpdated
.transition()
.duration(400)
.attr('transform', function (d: any) {
return `translate(${d.x}, ${d.y})`;
})
.style('opacity', 1)
.on('end', function (this: any) {
d3.select(this).classed('rendered', true);
});
// exit
nodeSelection
.exit()
.transition()
.delay(200)
.duration(400)
.attr('transform', function (d: any) {
if (d.incoming.length === 0) {
return `translate(${width / 2},50)`;
} else {
return `translate(${d.incoming[0].source.x},${d.incoming[0].source.y})`;
}
})
.style('opacity', 0)
.remove();
// select the links
let linkSelection: any = this.lineageContainerElement
.selectAll('path.link')
.data(this.linkLookup.values(), function (d: any) {
return d.id;
});
// add new links
const linksEntered = linkSelection
.enter()
.insert('path', '.node')
.attr('class', 'link')
.attr('stroke-width', 1.5)
.attr('stroke', '#000')
.attr('fill', 'none')
.attr('d', function (d: any) {
return `M${d.source.x},${d.source.y}L${d.source.x},${d.source.y}`;
})
.style('opacity', 0);
// merge
const linksUpdated = linkSelection.merge(linksEntered).attr('marker-end', '');
// update the links
linksUpdated
.transition()
.delay(200)
.duration(400)
.attr('marker-end', function (d: any) {
return `url(#${d.target.type})`;
})
.attr('d', function (d: any) {
return `M${d.source.x},${d.source.y}L${d.target.x},${d.target.y}`;
})
.style('opacity', 1)
.on('end', function (this: any) {
d3.select(this).classed('rendered', true);
});
// exit
linkSelection
.exit()
.attr('marker-end', '')
.transition()
.duration(400)
.attr('d', function (d: any) {
return `M${d.source.x},${d.source.y}L${d.source.x},${d.source.y}`;
})
.style('opacity', 0)
.remove();
}
}

View File

@ -15,9 +15,12 @@
~ limitations under the License. ~ limitations under the License.
--> -->
<div class="provenance-event-table h-full flex flex-col"> <div class="provenance-event-table h-full">
<div [class.hidden]="showLineage" class="h-full flex flex-col gap-y-2">
<div class="flex flex-col"> <div class="flex flex-col">
<div class="value font-bold">Displaying {{ filteredCount }} of {{ totalCount }}</div> <div [class.invisible]="!filterApplied" class="value font-bold">
Filter matched {{ filteredCount }} of {{ totalCount }}
</div>
<div class="flex justify-between"> <div class="flex justify-between">
<div> <div>
Oldest event available: <span class="value">{{ oldestEventAvailable }}</span> Oldest event available: <span class="value">{{ oldestEventAvailable }}</span>
@ -55,7 +58,7 @@
</div> </div>
</div> </div>
</div> </div>
<div class="flex-1 relative"> <div class="flex-1 relative -mt-4">
<div class="listing-table border absolute inset-0 overflow-y-auto"> <div class="listing-table border absolute inset-0 overflow-y-auto">
<table <table
mat-table mat-table
@ -129,16 +132,15 @@
<th mat-header-cell *matHeaderCellDef></th> <th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item"> <td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-3"> <div class="flex items-center gap-x-3">
<!-- <div--> <div
<!-- class="pointer icon icon-lineage"--> class="pointer icon icon-lineage"
<!-- *ngIf="canStop(item)"--> (click)="showLineageGraph(item)"
<!-- (click)="stopClicked(item)"--> title="Show Lineage"></div>
<!-- title="Stop"></div>-->
<div <div
*ngIf="supportsGoTo(item)" *ngIf="supportsGoTo(item)"
class="pointer fa fa-long-arrow-right" class="pointer fa fa-long-arrow-right"
title="Go To" title="Go To"
[routerLink]="getComponentLink(item)"></div> (click)="goToClicked(item)"></div>
</div> </div>
</td> </td>
</ng-container> </ng-container>
@ -153,4 +155,46 @@
</table> </table>
</div> </div>
</div> </div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshClicked()">
<i class="fa fa-refresh" [class.fa-spin]="loading"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp }}</div>
</div>
<div>
<mat-paginator [pageSize]="100" [hidePageSize]="true" [showFirstLastButtons]="true"></mat-paginator>
</div>
</div>
</div>
<div [class.hidden]="!showLineage" class="lineage h-full relative">
<div class="lineage-controls flex">
<div class="pointer fa fa-long-arrow-left" (click)="hideLineageGraph()" title="Go back to event list"></div>
</div>
<div class="lineage-slider flex flex-col pl-4">
<div class="w-80">
<mat-slider
[min]="minEventTimestamp"
[max]="maxEventTimestamp"
[step]="eventTimestampStep"
[discrete]="false"
showTickMarks>
<input matSliderThumb [value]="initialEventTimestampThreshold" (input)="handleInput($event)" />
</mat-slider>
</div>
<div class="value">
{{ formatLabel(currentEventTimestampThreshold) }}
</div>
</div>
<lineage
[lineage]="(provenanceLineage$ | async)!"
[eventId]="eventId"
[reset]="resetLineage"
[eventTimestampThreshold]="currentEventTimestampThreshold"
(openEventDialog)="submitProvenanceEventRequest($event)"
(goToProvenanceEventSource)="goToEventSource($event)"
(submitLineageQuery)="submitLineageQuery($event)"
(closeLineage)="hideLineageGraph()"></lineage>
</div>
</div> </div>

View File

@ -24,4 +24,36 @@
} }
} }
} }
.lineage {
border: 1px solid #e5ebed;
background-color: #f9fafb;
.lineage-controls {
position: absolute;
top: 5px;
right: 5px;
z-index: 1;
.fa,
.icon {
color: #004849;
width: 14px;
height: 14px;
text-align: center;
}
}
.lineage-slider {
position: absolute;
bottom: 10px;
left: 5px;
z-index: 1;
.mat-mdc-slider {
width: 100%;
margin: 0;
}
}
}
} }

View File

@ -27,11 +27,16 @@ import { MatInputModule } from '@angular/material/input';
import { MatOptionModule } from '@angular/material/core'; import { MatOptionModule } from '@angular/material/core';
import { MatSelectModule } from '@angular/material/select'; import { MatSelectModule } from '@angular/material/select';
import { FormBuilder, FormGroup, ReactiveFormsModule } from '@angular/forms'; import { FormBuilder, FormGroup, ReactiveFormsModule } from '@angular/forms';
import { NgForOf, NgIf } from '@angular/common'; import { AsyncPipe, NgForOf, NgIf } from '@angular/common';
import { debounceTime } from 'rxjs'; import { debounceTime, Observable, tap } from 'rxjs';
import { ProvenanceEventSummary } from '../../../../../state/shared'; import { ProvenanceEventSummary } from '../../../../../state/shared';
import { RouterLink } from '@angular/router'; import { RouterLink } from '@angular/router';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader'; import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { MatPaginator, MatPaginatorModule } from '@angular/material/paginator';
import { Lineage, LineageRequest } from '../../../state/lineage';
import { LineageComponent } from './lineage/lineage.component';
import { GoToProvenanceEventSourceRequest, ProvenanceEventRequest } from '../../../state/provenance-event-listing';
import { MatSliderModule } from '@angular/material/slider';
@Component({ @Component({
selector: 'provenance-event-table', selector: 'provenance-event-table',
@ -48,7 +53,11 @@ import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
NgForOf, NgForOf,
NgIf, NgIf,
RouterLink, RouterLink,
NgxSkeletonLoaderModule NgxSkeletonLoaderModule,
AsyncPipe,
MatPaginatorModule,
LineageComponent,
MatSliderModule
], ],
styleUrls: ['./provenance-event-table.component.scss', '../../../../../../assets/styles/listing-table.scss'] styleUrls: ['./provenance-event-table.component.scss', '../../../../../../assets/styles/listing-table.scss']
}) })
@ -77,16 +86,68 @@ export class ProvenanceEventTable implements AfterViewInit {
if (filterTerm?.length > 0) { if (filterTerm?.length > 0) {
const filterColumn = this.filterForm.get('filterColumn')?.value; const filterColumn = this.filterForm.get('filterColumn')?.value;
this.applyFilter(filterTerm, filterColumn); this.applyFilter(filterTerm, filterColumn);
} else {
this.resetPaginator();
} }
} }
} }
@Input() oldestEventAvailable!: string; @Input() oldestEventAvailable!: string;
@Input() timeOffset!: number;
@Input() resultsMessage!: string; @Input() resultsMessage!: string;
@Input() hasRequest!: boolean; @Input() hasRequest!: boolean;
@Input() loading!: boolean;
@Input() loadedTimestamp!: string;
@Input() set lineage$(lineage$: Observable<Lineage | null>) {
this.provenanceLineage$ = lineage$.pipe(
tap((lineage) => {
let minMillis: number = -1;
let maxMillis: number = -1;
lineage?.results.nodes.forEach((node) => {
// ensure this event has an event time
if (minMillis < 0 || minMillis > node.millis) {
minMillis = node.millis;
}
if (maxMillis < 0 || maxMillis < node.millis) {
maxMillis = node.millis;
}
});
if (this.minEventTimestamp < 0 || minMillis < this.minEventTimestamp) {
this.minEventTimestamp = minMillis;
}
if (this.maxEventTimestamp < 0 || maxMillis > this.maxEventTimestamp) {
this.maxEventTimestamp = maxMillis;
}
// determine the range for the slider
let range: number = this.maxEventTimestamp - this.minEventTimestamp;
const binCount: number = 10;
const remainder: number = range % binCount;
if (remainder > 0) {
// if the range doesn't fall evenly into binCount, increase the
// range by the difference to ensure it does
this.maxEventTimestamp += binCount - remainder;
range = this.maxEventTimestamp - this.minEventTimestamp;
}
this.eventTimestampStep = range / binCount;
this.initialEventTimestampThreshold = this.maxEventTimestamp;
this.currentEventTimestampThreshold = this.initialEventTimestampThreshold;
})
);
}
@Output() openSearchCriteria: EventEmitter<void> = new EventEmitter<void>(); @Output() openSearchCriteria: EventEmitter<void> = new EventEmitter<void>();
@Output() clearRequest: EventEmitter<void> = new EventEmitter<void>(); @Output() clearRequest: EventEmitter<void> = new EventEmitter<void>();
@Output() openEventDialog: EventEmitter<ProvenanceEventSummary> = new EventEmitter<ProvenanceEventSummary>(); @Output() openEventDialog: EventEmitter<ProvenanceEventRequest> = new EventEmitter<ProvenanceEventRequest>();
@Output() goToProvenanceEventSource: EventEmitter<GoToProvenanceEventSourceRequest> =
new EventEmitter<GoToProvenanceEventSourceRequest>();
@Output() resubmitProvenanceQuery: EventEmitter<void> = new EventEmitter<void>();
@Output() queryLineage: EventEmitter<LineageRequest> = new EventEmitter<LineageRequest>();
@Output() resetLineage: EventEmitter<void> = new EventEmitter<void>();
protected readonly TextTip = TextTip; protected readonly TextTip = TextTip;
protected readonly BulletinsTip = BulletinsTip; protected readonly BulletinsTip = BulletinsTip;
@ -106,6 +167,8 @@ export class ProvenanceEventTable implements AfterViewInit {
dataSource: MatTableDataSource<ProvenanceEventSummary> = new MatTableDataSource<ProvenanceEventSummary>(); dataSource: MatTableDataSource<ProvenanceEventSummary> = new MatTableDataSource<ProvenanceEventSummary>();
selectedEventId: string | null = null; selectedEventId: string | null = null;
@ViewChild(MatPaginator) paginator!: MatPaginator;
sort: Sort = { sort: Sort = {
active: 'eventTime', active: 'eventTime',
direction: 'desc' direction: 'desc'
@ -115,6 +178,17 @@ export class ProvenanceEventTable implements AfterViewInit {
filterColumnOptions: string[] = ['component name', 'component type', 'type']; filterColumnOptions: string[] = ['component name', 'component type', 'type'];
totalCount: number = 0; totalCount: number = 0;
filteredCount: number = 0; filteredCount: number = 0;
filterApplied: boolean = false;
showLineage: boolean = false;
provenanceLineage$!: Observable<Lineage | null>;
eventId: string | null = null;
minEventTimestamp: number = -1;
maxEventTimestamp: number = -1;
eventTimestampStep: number = 1;
initialEventTimestampThreshold: number = 0;
currentEventTimestampThreshold: number = 0;
constructor( constructor(
private formBuilder: FormBuilder, private formBuilder: FormBuilder,
@ -124,11 +198,14 @@ export class ProvenanceEventTable implements AfterViewInit {
} }
ngAfterViewInit(): void { ngAfterViewInit(): void {
this.dataSource.paginator = this.paginator;
this.filterForm this.filterForm
.get('filterTerm') .get('filterTerm')
?.valueChanges.pipe(debounceTime(500)) ?.valueChanges.pipe(debounceTime(500))
.subscribe((filterTerm: string) => { .subscribe((filterTerm: string) => {
const filterColumn = this.filterForm.get('filterColumn')?.value; const filterColumn = this.filterForm.get('filterColumn')?.value;
this.filterApplied = filterTerm.length > 0;
this.applyFilter(filterTerm, filterColumn); this.applyFilter(filterTerm, filterColumn);
}); });
@ -179,6 +256,13 @@ export class ProvenanceEventTable implements AfterViewInit {
applyFilter(filterTerm: string, filterColumn: string) { applyFilter(filterTerm: string, filterColumn: string) {
this.dataSource.filter = `${filterTerm}|${filterColumn}`; this.dataSource.filter = `${filterTerm}|${filterColumn}`;
this.filteredCount = this.dataSource.filteredData.length; this.filteredCount = this.dataSource.filteredData.length;
this.resetPaginator();
}
resetPaginator(): void {
if (this.dataSource.paginator) {
this.dataSource.paginator.firstPage();
}
} }
clearRequestClicked(): void { clearRequestClicked(): void {
@ -190,7 +274,14 @@ export class ProvenanceEventTable implements AfterViewInit {
} }
viewDetailsClicked(event: ProvenanceEventSummary) { viewDetailsClicked(event: ProvenanceEventSummary) {
this.openEventDialog.next(event); this.submitProvenanceEventRequest({
id: event.id,
clusterNodeId: event.clusterNodeId
});
}
submitProvenanceEventRequest(request: ProvenanceEventRequest): void {
this.openEventDialog.next(request);
} }
select(event: ProvenanceEventSummary): void { select(event: ProvenanceEventSummary): void {
@ -216,19 +307,59 @@ export class ProvenanceEventTable implements AfterViewInit {
return true; return true;
} }
getComponentLink(event: ProvenanceEventSummary): string[] { goToClicked(event: ProvenanceEventSummary): void {
let link: string[]; this.goToEventSource({
componentId: event.componentId,
if (event.groupId == event.componentId) { groupId: event.groupId
link = ['/process-groups', event.componentId]; });
} else if (event.componentId === 'Connection' || event.componentId === 'Load Balanced Connection') {
link = ['/process-groups', event.groupId, 'Connection', event.componentId];
} else if (event.componentId === 'Output Port') {
link = ['/process-groups', event.groupId, 'OutputPort', event.componentId];
} else {
link = ['/process-groups', event.groupId, 'Processor', event.componentId];
} }
return link; goToEventSource(request: GoToProvenanceEventSourceRequest): void {
this.goToProvenanceEventSource.next(request);
}
showLineageGraph(event: ProvenanceEventSummary): void {
this.eventId = event.id;
this.showLineage = true;
this.submitLineageQuery({
lineageRequestType: 'FLOWFILE',
uuid: event.flowFileUuid,
clusterNodeId: event.clusterNodeId
});
}
submitLineageQuery(request: LineageRequest): void {
this.queryLineage.next(request);
}
hideLineageGraph(): void {
this.showLineage = false;
this.minEventTimestamp = -1;
this.maxEventTimestamp = -1;
this.eventTimestampStep = 1;
this.initialEventTimestampThreshold = 0;
this.currentEventTimestampThreshold = 0;
this.resetLineage.next();
}
formatLabel(value: number): string {
// get the current user time to properly convert the server time
const now: Date = new Date();
// convert the user offset to millis
const userTimeOffset: number = now.getTimezoneOffset() * 60 * 1000;
// create the proper date by adjusting by the offsets
const date: Date = new Date(value + userTimeOffset + this.timeOffset);
return this.nifiCommon.formatDateTime(date);
}
handleInput(event: any): void {
this.currentEventTimestampThreshold = Number(event.target.value);
}
refreshClicked(): void {
this.resubmitProvenanceQuery.next();
} }
} }

View File

@ -33,7 +33,10 @@
<span class="context-menu-item-text">{{ item.text }}</span> <span class="context-menu-item-text">{{ item.text }}</span>
<span class="context-menu-group-item-img fa fa-caret-right"></span> <span class="context-menu-group-item-img fa fa-caret-right"></span>
</button> </button>
<fd-context-menu #menuComponent [menuId]="item.subMenuId"></fd-context-menu> <fd-context-menu
#menuComponent
[menuProvider]="menuProvider"
[menuId]="item.subMenuId"></fd-context-menu>
</ng-container> </ng-container>
<ng-template #regularMenuItem> <ng-template #regularMenuItem>

View File

@ -19,7 +19,7 @@ import { ComponentFixture, TestBed } from '@angular/core/testing';
import { ContextMenu } from './context-menu.component'; import { ContextMenu } from './context-menu.component';
import { provideMockStore } from '@ngrx/store/testing'; import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../../state/flow/flow.reducer'; import { initialState } from '../../../pages/flow-designer/state/flow/flow.reducer';
describe('ContextMenu', () => { describe('ContextMenu', () => {
let component: ContextMenu; let component: ContextMenu;

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, Input, OnInit, TemplateRef, ViewChild } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { AsyncPipe, NgForOf, NgIf } from '@angular/common';
import { CdkMenu, CdkMenuItem, CdkMenuTrigger } from '@angular/cdk/menu';
export interface ContextMenuDefinitionProvider {
getMenu(menuId: string): ContextMenuDefinition | undefined;
filterMenuItem(menuItem: ContextMenuItemDefinition): boolean;
menuItemClicked(menuItem: ContextMenuItemDefinition, event: MouseEvent): void;
}
export interface ContextMenuItemDefinition {
isSeparator?: boolean;
condition?: Function;
clazz?: string;
text?: string;
subMenuId?: string;
action?: Function;
}
export interface ContextMenuDefinition {
id: string;
menuItems: ContextMenuItemDefinition[];
}
@Component({
selector: 'fd-context-menu',
standalone: true,
templateUrl: './context-menu.component.html',
imports: [NgForOf, AsyncPipe, CdkMenu, CdkMenuItem, NgIf, CdkMenuTrigger],
styleUrls: ['./context-menu.component.scss']
})
export class ContextMenu implements OnInit {
@Input() menuProvider!: ContextMenuDefinitionProvider;
@Input() menuId: string | undefined;
@ViewChild('menu', { static: true }) menu!: TemplateRef<any>;
private showFocused: Subject<boolean> = new Subject();
showFocused$: Observable<boolean> = this.showFocused.asObservable();
constructor() {}
getMenuItems(menuId: string | undefined): ContextMenuItemDefinition[] {
if (menuId) {
const menuDefinition: ContextMenuDefinition | undefined = this.menuProvider.getMenu(menuId);
if (menuDefinition) {
// find all applicable menu items
let applicableMenuItems = menuDefinition.menuItems.filter((menuItem: ContextMenuItemDefinition) => {
// include if the condition matches
if (menuItem.condition) {
return this.menuProvider.filterMenuItem(menuItem);
}
// include if the sub menu has items
if (menuItem.subMenuId) {
return this.getMenuItems(menuItem.subMenuId).length > 0;
}
return true;
});
// remove any extra separators
applicableMenuItems = applicableMenuItems.filter(
(menuItem: ContextMenuItemDefinition, index: number) => {
if (menuItem.isSeparator && index > 0) {
// cannot have two consecutive separators
return !applicableMenuItems[index - 1].isSeparator;
}
return true;
}
);
return applicableMenuItems.filter((menuItem: ContextMenuItemDefinition, index: number) => {
if (menuItem.isSeparator) {
// a separator cannot be first
if (index === 0) {
return false;
}
// a separator cannot be last
if (index >= applicableMenuItems.length - 1) {
return false;
}
}
return true;
});
} else {
return [];
}
}
return [];
}
hasSubMenu(menuItemDefinition: ContextMenuItemDefinition): boolean {
return !!menuItemDefinition.subMenuId;
}
keydown(event: KeyboardEvent): void {
// TODO - Currently the first item in the context menu is auto focused. By default, this is rendered with an
// outline. This appears to be an issue with the cdkMenu/cdkMenuItem so we are working around it by manually
// overriding styles.
this.showFocused.next(true);
}
ngOnInit(): void {
this.showFocused.next(false);
}
menuItemClicked(menuItem: ContextMenuItemDefinition, event: MouseEvent) {
this.menuProvider.menuItemClicked(menuItem, event);
}
}

View File

@ -18,7 +18,6 @@
import { Component, Input } from '@angular/core'; import { Component, Input } from '@angular/core';
import { import {
BulletinsTipInput, BulletinsTipInput,
ComponentType,
ControllerServiceReferencingComponent, ControllerServiceReferencingComponent,
ControllerServiceReferencingComponentEntity, ControllerServiceReferencingComponentEntity,
ValidationErrorsTipInput ValidationErrorsTipInput