[NIFI-12504] Remaining views on Summary page (#8174)

* [NIFI-12504] process group summary tab
* ellipsify data/headers in table
* input and output port summary tabs
* connections summary tab
* rpg summary tab

* address review feedback including a filtering approach that doesn't rely on splitting strings by some arbitray character.

* remove unnecessary try/catch blocks

This closes #8174
This commit is contained in:
Rob Fellows 2023-12-22 14:22:10 -05:00 committed by GitHub
parent 553a36c088
commit 30419c8dd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 3598 additions and 143 deletions

View File

@ -21,6 +21,7 @@ import { MatTableDataSource } from '@angular/material/table';
import { MatSort } from '@angular/material/sort';
import { FormBuilder, FormGroup } from '@angular/forms';
import { debounceTime } from 'rxjs';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
@Component({
selector: 'counter-table',
@ -58,14 +59,11 @@ export class CounterTable implements AfterViewInit {
};
this.dataSource.filterPredicate = (data: CounterEntity, filter: string) => {
const filterArray = filter.split('|');
const filterTerm = filterArray[0];
const filterColumn = filterArray[1];
const { filterTerm, filterColumn } = JSON.parse(filter);
if (filterColumn === 'name') {
return data.name.toLowerCase().indexOf(filterTerm.toLowerCase()) >= 0;
return this.nifiCommon.stringContains(data.name, filterTerm, true);
} else {
return data.context.toLowerCase().indexOf(filterTerm.toLowerCase()) >= 0;
return this.nifiCommon.stringContains(data.context, filterTerm, true);
}
};
this.totalCount = counterEntities.length;
@ -97,7 +95,10 @@ export class CounterTable implements AfterViewInit {
@ViewChild(MatSort) sort!: MatSort;
constructor(private formBuilder: FormBuilder) {
constructor(
private formBuilder: FormBuilder,
private nifiCommon: NiFiCommon
) {
this.filterForm = this.formBuilder.group({ filterTerm: '', filterColumn: 'name' });
}
@ -119,7 +120,7 @@ export class CounterTable implements AfterViewInit {
}
applyFilter(filterTerm: string, filterColumn: string) {
this.dataSource.filter = `${filterTerm}|${filterColumn}`;
this.dataSource.filter = JSON.stringify({ filterTerm, filterColumn });
this.filteredCount = this.dataSource.filteredData.length;
}

View File

@ -66,16 +66,14 @@ export class ProvenanceEventTable implements AfterViewInit {
if (events) {
this.dataSource.data = this.sortEvents(events, this.sort);
this.dataSource.filterPredicate = (data: ProvenanceEventSummary, filter: string) => {
const filterArray = filter.split('|');
const filterTerm = filterArray[0];
const filterColumn = filterArray[1];
const { filterTerm, filterColumn } = JSON.parse(filter);
if (filterColumn === this.filterColumnOptions[0]) {
return data.componentName.toLowerCase().indexOf(filterTerm.toLowerCase()) >= 0;
return this.nifiCommon.stringContains(data.componentName, filterTerm, true);
} else if (filterColumn === this.filterColumnOptions[1]) {
return data.componentType.toLowerCase().indexOf(filterTerm.toLowerCase()) >= 0;
return this.nifiCommon.stringContains(data.componentType, filterTerm, true);
} else {
return data.eventType.toLowerCase().indexOf(filterTerm.toLowerCase()) >= 0;
return this.nifiCommon.stringContains(data.eventType, filterTerm, true);
}
};
this.totalCount = events.length;
@ -254,7 +252,7 @@ export class ProvenanceEventTable implements AfterViewInit {
}
applyFilter(filterTerm: string, filterColumn: string) {
this.dataSource.filter = `${filterTerm}|${filterColumn}`;
this.dataSource.filter = JSON.stringify({ filterTerm, filterColumn });
this.filteredCount = this.dataSource.filteredData.length;
this.resetPaginator();
}

View File

@ -49,23 +49,71 @@ const routes: Routes = [
},
{
path: 'input-ports',
component: InputPortStatusListing
component: InputPortStatusListing,
children: [
{
path: ':id',
component: InputPortStatusListing
}
]
},
{
path: 'output-ports',
component: OutputPortStatusListing
component: OutputPortStatusListing,
children: [
{
path: ':id',
component: OutputPortStatusListing
}
]
},
{
path: 'remote-process-groups',
component: RemoteProcessGroupStatusListing
component: RemoteProcessGroupStatusListing,
children: [
{
path: ':id',
component: RemoteProcessGroupStatusListing,
children: [
{
path: 'history',
component: RemoteProcessGroupStatusListing
}
]
}
]
},
{
path: 'connections',
component: ConnectionStatusListing
component: ConnectionStatusListing,
children: [
{
path: ':id',
component: ConnectionStatusListing,
children: [
{
path: 'history',
component: ConnectionStatusListing
}
]
}
]
},
{
path: 'process-groups',
component: ProcessGroupStatusListing
component: ProcessGroupStatusListing,
children: [
{
path: ':id',
component: ProcessGroupStatusListing,
children: [
{
path: 'history',
component: ProcessGroupStatusListing
}
]
}
]
}
]
}

View File

@ -53,10 +53,29 @@ export interface ConnectionStatusSnapshot extends BaseSnapshot {
sourceName: string;
}
export interface RemoteProcessGroupStatusSnapshot {
activeThreadCount: number;
bytesReceived: number;
bytesSent: number;
flowFilesReceived: number;
flowFilesSent: number;
groupId: string;
id: string;
name: string;
received: string;
sent: string;
targetUri: string;
transmissionStatus: string;
}
export interface ConnectionStatusSnapshotEntity extends BaseSnapshotEntity {
connectionStatusSnapshot: ConnectionStatusSnapshot;
}
export interface RemoteProcessGroupStatusSnapshotEntity extends BaseSnapshotEntity {
remoteProcessGroupStatusSnapshot: RemoteProcessGroupStatusSnapshot;
}
export interface ProcessorStatusSnapshot extends BaseSnapshot {
activeThreadCount: number;
bytesRead: number;
@ -84,13 +103,25 @@ export interface ProcessGroupStatusSnapshotEntity extends BaseSnapshotEntity {
processGroupStatusSnapshot: ProcessGroupStatusSnapshot;
}
export interface PortStatusSnapshotEntity extends BaseSnapshotEntity {
portStatusSnapshot: PortStatusSnapshot;
}
export enum VersionedFlowState {
SYNC_FAILURE = 'SYNC_FAILURE',
LOCALLY_MODIFIED = 'LOCALLY_MODIFIED',
STALE = 'STALE',
LOCALLY_MODIFIED_AND_STALE = 'LOCALLY_MODIFIED_AND_STALE',
UP_TO_DATE = 'UP_TO_DATE'
}
export interface ProcessGroupStatusSnapshot extends BaseSnapshot {
connectionStatusSnapshots: ConnectionStatusSnapshotEntity[];
processorStatusSnapshots: ProcessorStatusSnapshotEntity[];
processGroupStatusSnapshots: ProcessGroupStatusSnapshotEntity[];
remoteProcessGroupStatusSnapshots: any[];
inputPortStatusSnapshots: any[];
outputPortStatusSnapshots: any[];
inputPortStatusSnapshots: PortStatusSnapshotEntity[];
outputPortStatusSnapshots: PortStatusSnapshotEntity[];
bytesRead: number;
bytesReceived: number;
@ -112,10 +143,17 @@ export interface ProcessGroupStatusSnapshot extends BaseSnapshot {
processingNanos: number;
statelessActiveThreadCount: number;
terminatedThreadCount: number;
versionedFlowState?: VersionedFlowState;
}
export interface AggregateSnapshot extends ProcessGroupStatusSnapshot {}
export interface PortStatusSnapshot extends BaseSnapshot {
runStatus: string;
groupId: string;
activeThreadCount: number;
}
export interface ProcessGroupStatusEntity {
canRead: boolean;
processGroupStatus: {
@ -135,10 +173,31 @@ export interface SelectProcessorStatusRequest {
id: string;
}
export interface SelectProcessGroupStatusRequest {
id: string;
}
export interface SelectPortStatusRequest {
id: string;
}
export interface SelectConnectionStatusRequest {
id: string;
}
export interface SelectRemoteProcessGroupStatusRequest {
id: string;
}
export interface SummaryListingState {
clusterSummary: ClusterSummaryEntity | null;
processGroupStatus: ProcessGroupStatusEntity | null;
processorStatusSnapshots: ProcessorStatusSnapshotEntity[];
processGroupStatusSnapshots: ProcessGroupStatusSnapshotEntity[];
inputPortStatusSnapshots: PortStatusSnapshotEntity[];
outputPortStatusSnapshots: PortStatusSnapshotEntity[];
connectionStatusSnapshots: ConnectionStatusSnapshotEntity[];
remoteProcessGroupStatusSnapshots: RemoteProcessGroupStatusSnapshotEntity[];
loadedTimestamp: string;
error: string | null;
status: 'pending' | 'loading' | 'error' | 'success';

View File

@ -16,7 +16,14 @@
*/
import { createAction, props } from '@ngrx/store';
import { SelectProcessorStatusRequest, SummaryListingResponse } from './index';
import {
SelectConnectionStatusRequest,
SelectPortStatusRequest,
SelectProcessGroupStatusRequest,
SelectProcessorStatusRequest,
SelectRemoteProcessGroupStatusRequest,
SummaryListingResponse
} from './index';
const SUMMARY_LISTING_PREFIX: string = '[Summary Listing]';
@ -40,9 +47,49 @@ export const selectProcessorStatus = createAction(
props<{ request: SelectProcessorStatusRequest }>()
);
export const selectProcessGroupStatus = createAction(
`${SUMMARY_LISTING_PREFIX} Select Process Group Status`,
props<{ request: SelectProcessGroupStatusRequest }>()
);
export const selectInputPortStatus = createAction(
`${SUMMARY_LISTING_PREFIX} Select Input Port Status`,
props<{ request: SelectPortStatusRequest }>()
);
export const selectOutputPortStatus = createAction(
`${SUMMARY_LISTING_PREFIX} Select Output Port Status`,
props<{ request: SelectPortStatusRequest }>()
);
export const selectConnectionStatus = createAction(
`${SUMMARY_LISTING_PREFIX} Select Connection Status`,
props<{ request: SelectConnectionStatusRequest }>()
);
export const selectRemoteProcessGroupStatus = createAction(
`${SUMMARY_LISTING_PREFIX} Select Remote Process Group Status`,
props<{ request: SelectRemoteProcessGroupStatusRequest }>()
);
export const navigateToViewProcessorStatusHistory = createAction(
`${SUMMARY_LISTING_PREFIX} Navigate To Processor Status History`,
props<{ id: string }>()
);
export const navigateToViewProcessGroupStatusHistory = createAction(
`${SUMMARY_LISTING_PREFIX} Navigate To Process Group Status History`,
props<{ id: string }>()
);
export const navigateToViewConnectionStatusHistory = createAction(
`${SUMMARY_LISTING_PREFIX} Navigate To Connection Status History`,
props<{ id: string }>()
);
export const navigateToViewRemoteProcessGroupStatusHistory = createAction(
`${SUMMARY_LISTING_PREFIX} Navigate To Remote Process Group Status History`,
props<{ id: string }>()
);
export const resetSummaryState = createAction(`${SUMMARY_LISTING_PREFIX} Reset Summary State`);

View File

@ -73,6 +73,66 @@ export class SummaryListingEffects {
{ dispatch: false }
);
selectProcessGroupStatus$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.selectProcessGroupStatus),
map((action) => action.request),
tap((request) => {
this.router.navigate(['/summary', 'process-groups', request.id]);
})
),
{ dispatch: false }
);
selectInputPortStatus$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.selectInputPortStatus),
map((action) => action.request),
tap((request) => {
this.router.navigate(['/summary', 'input-ports', request.id]);
})
),
{ dispatch: false }
);
selectOutputPortStatus$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.selectOutputPortStatus),
map((action) => action.request),
tap((request) => {
this.router.navigate(['/summary', 'output-ports', request.id]);
})
),
{ dispatch: false }
);
selectConnectionStatus$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.selectConnectionStatus),
map((action) => action.request),
tap((request) => {
this.router.navigate(['/summary', 'connections', request.id]);
})
),
{ dispatch: false }
);
selectRpgStatus$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.selectRemoteProcessGroupStatus),
map((action) => action.request),
tap((request) => {
this.router.navigate(['/summary', 'remote-process-groups', request.id]);
})
),
{ dispatch: false }
);
navigateToProcessorStatusHistory$ = createEffect(
() =>
this.actions$.pipe(
@ -85,20 +145,88 @@ export class SummaryListingEffects {
{ dispatch: false }
);
completeProcessorStatusHistory$ = createEffect(
navigateToViewProcessGroupStatusHistory$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.navigateToViewProcessGroupStatusHistory),
map((action) => action.id),
tap((id) => {
this.router.navigate(['/summary', 'process-groups', id, 'history']);
})
),
{ dispatch: false }
);
navigateToViewConnectionStatusHistory$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.navigateToViewConnectionStatusHistory),
map((action) => action.id),
tap((id) => {
this.router.navigate(['/summary', 'connections', id, 'history']);
})
),
{ dispatch: false }
);
navigateToViewRpgStatusHistory$ = createEffect(
() =>
this.actions$.pipe(
ofType(SummaryListingActions.navigateToViewRemoteProcessGroupStatusHistory),
map((action) => action.id),
tap((id) => {
this.router.navigate(['/summary', 'remote-process-groups', id, 'history']);
})
),
{ dispatch: false }
);
// update the route to remove "/history", selecting the component in the summary list
completeStatusHistory$ = createEffect(
() =>
this.actions$.pipe(
ofType(StatusHistoryActions.viewStatusHistoryComplete),
map((action) => action.request),
filter((request) => request.source === 'summary' && request.componentType === ComponentType.Processor),
filter((request) => request.source === 'summary'),
tap((request) => {
this.store.dispatch(
SummaryListingActions.selectProcessorStatus({
request: {
id: request.componentId
}
})
);
switch (request.componentType) {
case ComponentType.ProcessGroup:
this.store.dispatch(
SummaryListingActions.selectProcessGroupStatus({
request: {
id: request.componentId
}
})
);
break;
case ComponentType.Connection:
this.store.dispatch(
SummaryListingActions.selectConnectionStatus({
request: {
id: request.componentId
}
})
);
break;
case ComponentType.RemoteProcessGroup:
this.store.dispatch(
SummaryListingActions.selectRemoteProcessGroupStatus({
request: {
id: request.componentId
}
})
);
break;
case ComponentType.Processor:
default:
this.store.dispatch(
SummaryListingActions.selectProcessorStatus({
request: {
id: request.componentId
}
})
);
}
})
),
{ dispatch: false }

View File

@ -16,7 +16,15 @@
*/
import { createReducer, on } from '@ngrx/store';
import { ProcessGroupStatusSnapshot, ProcessorStatusSnapshotEntity, SummaryListingState } from './index';
import {
ConnectionStatusSnapshotEntity,
PortStatusSnapshotEntity,
ProcessGroupStatusSnapshot,
ProcessGroupStatusSnapshotEntity,
ProcessorStatusSnapshotEntity,
RemoteProcessGroupStatusSnapshotEntity,
SummaryListingState
} from './index';
import {
loadSummaryListing,
loadSummaryListingSuccess,
@ -28,6 +36,11 @@ export const initialState: SummaryListingState = {
clusterSummary: null,
processGroupStatus: null,
processorStatusSnapshots: [],
processGroupStatusSnapshots: [],
inputPortStatusSnapshots: [],
outputPortStatusSnapshots: [],
connectionStatusSnapshots: [],
remoteProcessGroupStatusSnapshots: [],
status: 'pending',
error: null,
loadedTimestamp: ''
@ -46,6 +59,30 @@ export const summaryListingReducer = createReducer(
response.status.processGroupStatus.aggregateSnapshot
);
// get the root pg entity
const root: ProcessGroupStatusSnapshotEntity = {
id: response.status.processGroupStatus.id,
canRead: response.status.canRead,
processGroupStatusSnapshot: response.status.processGroupStatus.aggregateSnapshot
};
const childProcessGroups: ProcessGroupStatusSnapshotEntity[] = flattenProcessGroupStatusSnapshots(
response.status.processGroupStatus.aggregateSnapshot
);
const inputPorts: PortStatusSnapshotEntity[] = flattenInputPortStatusSnapshots(
response.status.processGroupStatus.aggregateSnapshot
);
const outputPorts: PortStatusSnapshotEntity[] = flattenOutputPortStatusSnapshots(
response.status.processGroupStatus.aggregateSnapshot
);
const connections: ConnectionStatusSnapshotEntity[] = flattenConnectionStatusSnapshots(
response.status.processGroupStatus.aggregateSnapshot
);
const rpgs: RemoteProcessGroupStatusSnapshotEntity[] = flattenRpgStatusSnapshots(
response.status.processGroupStatus.aggregateSnapshot
);
return {
...state,
error: null,
@ -53,7 +90,12 @@ export const summaryListingReducer = createReducer(
loadedTimestamp: response.status.processGroupStatus.statsLastRefreshed,
processGroupStatus: response.status,
clusterSummary: response.clusterSummary,
processorStatusSnapshots: processors
processorStatusSnapshots: processors,
processGroupStatusSnapshots: [root, ...childProcessGroups],
inputPortStatusSnapshots: inputPorts,
outputPortStatusSnapshots: outputPorts,
connectionStatusSnapshots: connections,
remoteProcessGroupStatusSnapshots: rpgs
};
}),
@ -94,3 +136,68 @@ function flattenProcessorStatusSnapshots(
return processors;
}
}
function flattenProcessGroupStatusSnapshots(snapshot: ProcessGroupStatusSnapshot): ProcessGroupStatusSnapshotEntity[] {
const processGroups = [...snapshot.processGroupStatusSnapshots];
if (snapshot.processGroupStatusSnapshots?.length > 0) {
const children = snapshot.processGroupStatusSnapshots
.map((pg) => pg.processGroupStatusSnapshot)
.flatMap((pg) => flattenProcessGroupStatusSnapshots(pg));
return [...processGroups, ...children];
} else {
return [...processGroups];
}
}
function flattenInputPortStatusSnapshots(snapshot: ProcessGroupStatusSnapshot): PortStatusSnapshotEntity[] {
const ports = [...snapshot.inputPortStatusSnapshots];
if (snapshot.processGroupStatusSnapshots?.length > 0) {
const children = snapshot.processGroupStatusSnapshots
.map((pg) => pg.processGroupStatusSnapshot)
.flatMap((pg) => flattenInputPortStatusSnapshots(pg));
return [...ports, ...children];
} else {
return ports;
}
}
function flattenOutputPortStatusSnapshots(snapshot: ProcessGroupStatusSnapshot): PortStatusSnapshotEntity[] {
const ports = [...snapshot.outputPortStatusSnapshots];
if (snapshot.processGroupStatusSnapshots?.length > 0) {
const children = snapshot.processGroupStatusSnapshots
.map((pg) => pg.processGroupStatusSnapshot)
.flatMap((pg) => flattenOutputPortStatusSnapshots(pg));
return [...ports, ...children];
} else {
return ports;
}
}
function flattenConnectionStatusSnapshots(snapshot: ProcessGroupStatusSnapshot): ConnectionStatusSnapshotEntity[] {
const connections = [...snapshot.connectionStatusSnapshots];
if (snapshot.processGroupStatusSnapshots?.length > 0) {
const children = snapshot.processGroupStatusSnapshots
.map((pg) => pg.processGroupStatusSnapshot)
.flatMap((pg) => flattenConnectionStatusSnapshots(pg));
return [...connections, ...children];
} else {
return connections;
}
}
function flattenRpgStatusSnapshots(snapshot: ProcessGroupStatusSnapshot): RemoteProcessGroupStatusSnapshotEntity[] {
const rpgs = [...snapshot.remoteProcessGroupStatusSnapshots];
if (snapshot.processGroupStatusSnapshots?.length > 0) {
const children = snapshot.processGroupStatusSnapshots
.map((pg) => pg.processGroupStatusSnapshot)
.flatMap((pg) => flattenRpgStatusSnapshots(pg));
return [...rpgs, ...children];
} else {
return rpgs;
}
}

View File

@ -17,7 +17,14 @@
import { createSelector } from '@ngrx/store';
import { selectSummaryState, SummaryState } from '../index';
import { ProcessorStatusSnapshotEntity, summaryListingFeatureKey, SummaryListingState } from './index';
import {
ConnectionStatusSnapshotEntity,
ProcessGroupStatusSnapshotEntity,
ProcessorStatusSnapshotEntity,
RemoteProcessGroupStatusSnapshotEntity,
summaryListingFeatureKey,
SummaryListingState
} from './index';
import { selectCurrentRoute } from '../../../../state/router/router.selectors';
export const selectSummaryListing = createSelector(
@ -55,6 +62,16 @@ export const selectProcessorStatus = (id: string) =>
processors.find((processor) => id === processor.id)
);
export const selectConnectionStatus = (id: string) =>
createSelector(selectConnectionStatusSnapshots, (connections: ConnectionStatusSnapshotEntity[]) =>
connections.find((connection) => id === connection.id)
);
export const selectRemoteProcessGroupStatus = (id: string) =>
createSelector(selectRemoteProcessGroupStatusSnapshots, (rpgs: RemoteProcessGroupStatusSnapshotEntity[]) =>
rpgs.find((rpg) => id === rpg.id)
);
export const selectProcessorIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route) {
return route.params.id;
@ -67,3 +84,68 @@ export const selectViewStatusHistory = createSelector(selectCurrentRoute, (route
return route.params.id;
}
});
export const selectProcessGroupStatusSnapshots = createSelector(
selectSummaryListing,
(state: SummaryListingState) => state.processGroupStatusSnapshots
);
export const selectProcessGroupStatusItem = (id: string) =>
createSelector(selectProcessGroupStatusSnapshots, (pgs: ProcessGroupStatusSnapshotEntity[]) =>
pgs.find((pg) => id === pg.id)
);
export const selectProcessGroupIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route) {
return route.params.id;
}
return null;
});
export const selectInputPortIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route) {
return route.params.id;
}
return null;
});
export const selectOutputPortIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route) {
return route.params.id;
}
return null;
});
export const selectConnectionIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route) {
return route.params.id;
}
return null;
});
export const selectRemoteProcessGroupIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route) {
return route.params.id;
}
return null;
});
export const selectInputPortStatusSnapshots = createSelector(
selectSummaryListing,
(state: SummaryListingState) => state.inputPortStatusSnapshots
);
export const selectOutputPortStatusSnapshots = createSelector(
selectSummaryListing,
(state: SummaryListingState) => state.outputPortStatusSnapshots
);
export const selectConnectionStatusSnapshots = createSelector(
selectSummaryListing,
(state: SummaryListingState) => state.connectionStatusSnapshots
);
export const selectRemoteProcessGroupStatusSnapshots = createSelector(
selectSummaryListing,
(state: SummaryListingState) => state.remoteProcessGroupStatusSnapshots
);

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Sort } from '@angular/material/sort';
export interface MultiSort extends Sort {
sortValueIndex: number;
totalValues: number;
}

View File

@ -0,0 +1,147 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="port-status-table h-full flex flex-col">
<!-- allow filtering of the table -->
<summary-table-filter
[filteredCount]="filteredCount"
[totalCount]="totalCount"
[filterableColumns]="filterableColumns"
[includeStatusFilter]="true"
[includePrimaryNodeOnlyFilter]="false"
(filterChanged)="applyFilter($event)"></summary-table-filter>
<div class="flex-1 relative">
<div class="listing-table overflow-y-auto border absolute inset-0">
<table
mat-table
[dataSource]="dataSource"
matSort
matSortDisableClear
(matSortChange)="sortData($event)"
[matSortActive]="initialSortColumn"
[matSortDirection]="initialSortDirection">
<!-- More Details Column -->
<ng-container matColumnDef="moreDetails">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item"></td>
</ng-container>
<!-- Name Column -->
<ng-container matColumnDef="name">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Name</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatName(item)">
{{ formatName(item) }}
</td>
</ng-container>
<!-- Run Status column -->
<ng-container matColumnDef="runStatus">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Run Status</th>
<td mat-cell *matCellDef="let item">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1.5">
<span [ngClass]="getRunStatusIcon(item)"></span>
<span [title]="formatRunStatus(item)">{{ formatRunStatus(item) }}</span>
<ng-container *ngIf="item.processorStatusSnapshot as pg">
<span
*ngIf="pg.terminatedThreadCount > 0; else activeThreads"
title="Threads: (Active / Terminated)"
>({{ pg.activeThreadCount }}/{{ pg.terminatedThreadCount }})</span
>
<ng-template #activeThreads>
<span *ngIf="pg.activeThreadCount > 0" title="Active Threads"
>({{ pg.activeThreadCount }})</span
>
</ng-template>
</ng-container>
</div>
</td>
</ng-container>
<!-- Input column -->
<ng-container matColumnDef="in">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 0 }"
>In</span
>
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatIn(item)">
{{ formatIn(item) }}
</td>
</ng-container>
<!-- Output column -->
<ng-container matColumnDef="out">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 0 }">
Out
</span>
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 1 }">
(Size)
</span>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatOut(item)">
{{ formatOut(item) }}
</td>
</ng-container>
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-3">
<div
class="pointer fa fa-long-arrow-right"
[routerLink]="getPortLink(item)"
(click)="$event.stopPropagation()"
title="Go to {{ portType }}} port"></div>
</div>
</td>
</ng-container>
<tr mat-header-row *matHeaderRowDef="displayedColumns; sticky: true"></tr>
<tr
mat-row
*matRowDef="let row; let even = even; columns: displayedColumns"
[class.even]="even"
(click)="select(row)"
[class.selected]="isSelected(row)"></tr>
</table>
</div>
</div>
</div>

View File

@ -0,0 +1,30 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.port-status-table {
.listing-table {
.mat-column-moreDetails {
width: 32px;
min-width: 32px;
}
.mat-column-actions {
width: 72px;
min-width: 72px;
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { PortStatusTable } from './port-status-table.component';
import { SummaryTableFilterModule } from '../summary-table-filter/summary-table-filter.module';
import { MatSortModule } from '@angular/material/sort';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
describe('PortStatusTable', () => {
let component: PortStatusTable;
let fixture: ComponentFixture<PortStatusTable>;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [PortStatusTable, SummaryTableFilterModule, MatSortModule, NoopAnimationsModule]
});
fixture = TestBed.createComponent(PortStatusTable);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, EventEmitter, Input, Output } from '@angular/core';
import { CommonModule } from '@angular/common';
import { MatSortModule, Sort, SortDirection } from '@angular/material/sort';
import { MultiSort } from '../index';
import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { PortStatusSnapshot, PortStatusSnapshotEntity } from '../../../state/summary-listing';
import { SummaryTableFilterModule } from '../summary-table-filter/summary-table-filter.module';
import {
SummaryTableFilterArgs,
SummaryTableFilterColumn
} from '../summary-table-filter/summary-table-filter.component';
import { ComponentType } from '../../../../../state/shared';
import { RouterLink } from '@angular/router';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
export type SupportedColumns = 'name' | 'runStatus' | 'in' | 'out';
@Component({
selector: 'port-status-table',
standalone: true,
imports: [CommonModule, SummaryTableFilterModule, MatSortModule, MatTableModule, RouterLink],
templateUrl: './port-status-table.component.html',
styleUrls: ['./port-status-table.component.scss', '../../../../../../assets/styles/listing-table.scss']
})
export class PortStatusTable {
private _initialSortColumn: SupportedColumns = 'name';
private _initialSortDirection: SortDirection = 'asc';
private _portType!: 'input' | 'output';
filterableColumns: SummaryTableFilterColumn[] = [{ key: 'name', label: 'name' }];
totalCount: number = 0;
filteredCount: number = 0;
multiSort: MultiSort = {
active: this._initialSortColumn,
direction: this._initialSortDirection,
sortValueIndex: 0,
totalValues: 2
};
displayedColumns: string[] = [];
dataSource: MatTableDataSource<PortStatusSnapshotEntity> = new MatTableDataSource<PortStatusSnapshotEntity>();
constructor(private nifiCommon: NiFiCommon) {}
@Input() set portType(type: 'input' | 'output') {
if (type === 'input') {
this.displayedColumns = ['moreDetails', 'name', 'runStatus', 'in', 'actions'];
} else {
this.displayedColumns = ['moreDetails', 'name', 'runStatus', 'out', 'actions'];
}
this._portType = type;
}
get portType() {
return this._portType;
}
@Input() selectedPortId!: string;
@Input() set initialSortColumn(initialSortColumn: SupportedColumns) {
this._initialSortColumn = initialSortColumn;
this.multiSort = { ...this.multiSort, active: initialSortColumn };
}
get initialSortColumn() {
return this._initialSortColumn;
}
@Input() set initialSortDirection(initialSortDirection: SortDirection) {
this._initialSortDirection = initialSortDirection;
this.multiSort = { ...this.multiSort, direction: initialSortDirection };
}
get initialSortDirection() {
return this._initialSortDirection;
}
@Input() set ports(ports: PortStatusSnapshotEntity[]) {
if (ports) {
this.dataSource.data = this.sortEntities(ports, this.multiSort);
this.dataSource.filterPredicate = (data: PortStatusSnapshotEntity, filter: string) => {
const { filterTerm, filterColumn, filterStatus } = JSON.parse(filter);
const matchOnStatus: boolean = filterStatus !== 'All';
if (matchOnStatus) {
if (data.portStatusSnapshot.runStatus !== filterStatus) {
return false;
}
}
if (filterTerm === '') {
return true;
}
const field: string = data.portStatusSnapshot[filterColumn as keyof PortStatusSnapshot] as string;
return this.nifiCommon.stringContains(field, filterTerm, true);
};
this.totalCount = ports.length;
this.filteredCount = ports.length;
}
}
@Output() selectPort: EventEmitter<PortStatusSnapshotEntity> = new EventEmitter<PortStatusSnapshotEntity>();
applyFilter(filter: SummaryTableFilterArgs) {
this.dataSource.filter = JSON.stringify(filter);
this.filteredCount = this.dataSource.filteredData.length;
}
formatName(port: PortStatusSnapshotEntity): string {
return port.portStatusSnapshot.name;
}
formatRunStatus(port: PortStatusSnapshotEntity): string {
return port.portStatusSnapshot.runStatus;
}
formatIn(port: PortStatusSnapshotEntity): string {
return port.portStatusSnapshot.input;
}
formatOut(port: PortStatusSnapshotEntity): string {
return port.portStatusSnapshot.output;
}
getRunStatusIcon(port: PortStatusSnapshotEntity): string {
switch (port.portStatusSnapshot.runStatus.toLowerCase()) {
case 'running':
return 'fa fa-play running';
case 'stopped':
return 'fa fa-stop stopped';
case 'enabled':
return 'fa fa-flash enabled';
case 'disabled':
return 'icon icon-enable-false disabled';
case 'validating':
return 'fa fa-spin fa-circle-notch validating';
case 'invalid':
return 'fa fa-warning invalid';
default:
return '';
}
}
getPortLink(port: PortStatusSnapshotEntity): string[] {
const componentType: ComponentType =
this._portType === 'input' ? ComponentType.InputPort : ComponentType.OutputPort;
return ['/process-groups', port.portStatusSnapshot.groupId, componentType, port.id];
}
select(port: PortStatusSnapshotEntity): void {
this.selectPort.next(port);
}
isSelected(port: PortStatusSnapshotEntity): boolean {
if (this.selectedPortId) {
return port.id === this.selectedPortId;
}
return false;
}
sortData(sort: Sort) {
this.setMultiSort(sort);
this.dataSource.data = this.sortEntities(this.dataSource.data, sort);
}
canRead(port: PortStatusSnapshotEntity) {
return port.canRead;
}
private supportsMultiValuedSort(sort: Sort): boolean {
switch (sort.active) {
case 'in':
case 'out':
return true;
default:
return false;
}
}
private setMultiSort(sort: Sort) {
const { active, direction, sortValueIndex, totalValues } = this.multiSort;
if (this.supportsMultiValuedSort(sort)) {
if (active === sort.active) {
// previous sort was of the same column
if (direction === 'desc' && sort.direction === 'asc') {
// change from previous index to the next
const newIndex = sortValueIndex + 1 >= totalValues ? 0 : sortValueIndex + 1;
this.multiSort = { ...sort, sortValueIndex: newIndex, totalValues };
} else {
this.multiSort = { ...sort, sortValueIndex, totalValues };
}
} else {
// sorting a different column, just reset
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
} else {
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
}
private sortEntities(data: PortStatusSnapshotEntity[], sort: Sort): PortStatusSnapshotEntity[] {
if (!data) {
return [];
}
return data.slice().sort((a, b) => {
const isAsc: boolean = sort.direction === 'asc';
let retVal: number = 0;
switch (sort.active) {
case 'name':
retVal = this.nifiCommon.compareString(a.portStatusSnapshot.name, b.portStatusSnapshot.name);
break;
case 'runStatus':
retVal = this.nifiCommon.compareString(
a.portStatusSnapshot.runStatus,
b.portStatusSnapshot.runStatus
);
break;
case 'in':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.portStatusSnapshot.flowFilesIn,
b.portStatusSnapshot.flowFilesIn
);
} else {
retVal = this.nifiCommon.compareNumber(
a.portStatusSnapshot.bytesIn,
b.portStatusSnapshot.bytesIn
);
}
break;
case 'out':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.portStatusSnapshot.flowFilesOut,
b.portStatusSnapshot.flowFilesOut
);
} else {
retVal = this.nifiCommon.compareNumber(
a.portStatusSnapshot.bytesOut,
b.portStatusSnapshot.bytesOut
);
}
break;
default:
retVal = 0;
}
return retVal * (isAsc ? 1 : -1);
});
}
}

View File

@ -30,7 +30,7 @@
<mat-label>Filter By</mat-label>
<mat-select formControlName="filterColumn">
<ng-container *ngFor="let option of filterableColumns">
<mat-option [value]="option"> {{ option }} </mat-option>
<mat-option [value]="option.key"> {{ option.label }} </mat-option>
</ng-container>
</mat-select>
</mat-form-field>

View File

@ -19,6 +19,10 @@ import { AfterViewInit, Component, EventEmitter, Input, Output } from '@angular/
import { FormBuilder, FormGroup } from '@angular/forms';
import { debounceTime } from 'rxjs';
export interface SummaryTableFilterColumn {
key: string;
label: string;
}
export interface SummaryTableFilterArgs {
filterTerm: string;
filterColumn: string;
@ -35,8 +39,9 @@ export class SummaryTableFilter implements AfterViewInit {
filterForm: FormGroup;
private _filteredCount: number = 0;
private _totalCount: number = 0;
private _initialFilterColumn: string = 'name';
@Input() filterableColumns: string[] = [];
@Input() filterableColumns: SummaryTableFilterColumn[] = [];
@Input() includeStatusFilter: boolean = false;
@Input() includePrimaryNodeOnlyFilter: boolean = false;
@Output() filterChanged: EventEmitter<SummaryTableFilterArgs> = new EventEmitter<SummaryTableFilterArgs>();
@ -45,14 +50,15 @@ export class SummaryTableFilter implements AfterViewInit {
this.filterForm.get('filterTerm')?.value(term);
}
@Input() set filterColumn(column: string) {
this._initialFilterColumn = column;
if (this.filterableColumns?.length > 0) {
if (this.filterableColumns.indexOf(column) >= 0) {
this.filterForm.get('filterColumn')?.value(column);
if (this.filterableColumns.findIndex((col) => col.key === column) >= 0) {
this.filterForm.get('filterColumn')?.setValue(column);
} else {
this.filterForm.get('filterColumn')?.value(this.filterableColumns[0]);
this.filterForm.get('filterColumn')?.setValue(this.filterableColumns[0].key);
}
} else {
this.filterForm.get('filterColumn')?.value(this.filterableColumns[0]);
this.filterForm.get('filterColumn')?.setValue(this._initialFilterColumn);
}
}
@ -81,7 +87,7 @@ export class SummaryTableFilter implements AfterViewInit {
constructor(private formBuilder: FormBuilder) {
this.filterForm = this.formBuilder.group({
filterTerm: '',
filterColumn: 'name',
filterColumn: this._initialFilterColumn || 'name',
filterStatus: 'All',
primaryOnly: false
});

View File

@ -15,4 +15,33 @@
~ limitations under the License.
-->
<p>connection-status-listing works!</p>
<ng-container>
<div *ngIf="isInitialLoading((loadedTimestamp$ | async)!); else loaded">
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</div>
<ng-template #loaded>
<div class="flex flex-col h-full gap-y-2">
<div class="flex-1" *ngIf="currentUser$ | async as user">
<ng-container>
<connection-status-table
[connections]="(connectionStatusSnapshots$ | async)!"
[selectedConnectionId]="selectedConnectionId$ | async"
(selectConnection)="selectConnection($event)"
(viewStatusHistory)="viewStatusHistory($event)"
initialSortColumn="sourceName"
initialSortDirection="asc"></connection-status-table>
</ng-container>
</div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshSummaryListing()">
<i class="fa fa-refresh" [class.fa-spin]="(summaryListingStatus$ | async) === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div>
</ng-template>
</ng-container>

View File

@ -18,6 +18,11 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { ConnectionStatusListing } from './connection-status-listing.component';
import { ConnectionStatusTable } from './connection-status-table/connection-status-table.component';
import { SummaryTableFilterModule } from '../common/summary-table-filter/summary-table-filter.module';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
describe('ConnectionStatusListing', () => {
let component: ConnectionStatusListing;
@ -25,7 +30,9 @@ describe('ConnectionStatusListing', () => {
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [ConnectionStatusListing]
declarations: [ConnectionStatusListing],
imports: [ConnectionStatusTable, SummaryTableFilterModule, NoopAnimationsModule],
providers: [provideMockStore({ initialState })]
});
fixture = TestBed.createComponent(ConnectionStatusListing);
component = fixture.componentInstance;

View File

@ -16,10 +16,92 @@
*/
import { Component } from '@angular/core';
import { Store } from '@ngrx/store';
import {
ConnectionStatusSnapshotEntity,
PortStatusSnapshotEntity,
SummaryListingState
} from '../../state/summary-listing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
import * as SummaryListingActions from '../../state/summary-listing/summary-listing.actions';
import {
selectConnectionIdFromRoute,
selectConnectionStatus,
selectConnectionStatusSnapshots,
selectProcessorStatus,
selectSummaryListingLoadedTimestamp,
selectSummaryListingStatus,
selectViewStatusHistory
} from '../../state/summary-listing/summary-listing.selectors';
import { selectUser } from '../../../../state/user/user.selectors';
import { filter, switchMap, take } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { openStatusHistoryDialog } from '../../../../state/status-history/status-history.actions';
import { ComponentType } from '../../../../state/shared';
@Component({
selector: 'connection-status-listing',
templateUrl: './connection-status-listing.component.html',
styleUrls: ['./connection-status-listing.component.scss']
})
export class ConnectionStatusListing {}
export class ConnectionStatusListing {
loadedTimestamp$ = this.store.select(selectSummaryListingLoadedTimestamp);
summaryListingStatus$ = this.store.select(selectSummaryListingStatus);
currentUser$ = this.store.select(selectUser);
connectionStatusSnapshots$ = this.store.select(selectConnectionStatusSnapshots);
selectedConnectionId$ = this.store.select(selectConnectionIdFromRoute);
constructor(private store: Store<SummaryListingState>) {
this.store
.select(selectViewStatusHistory)
.pipe(
filter((id: string) => !!id),
switchMap((id: string) =>
this.store.select(selectConnectionStatus(id)).pipe(
filter((connection) => !!connection),
take(1)
)
),
takeUntilDestroyed()
)
.subscribe((connection) => {
if (connection) {
this.store.dispatch(
openStatusHistoryDialog({
request: {
source: 'summary',
componentType: ComponentType.Connection,
componentId: connection.id
}
})
);
}
});
}
isInitialLoading(loadedTimestamp: string): boolean {
return loadedTimestamp == initialState.loadedTimestamp;
}
refreshSummaryListing() {
this.store.dispatch(SummaryListingActions.loadSummaryListing({ recursive: true }));
}
selectConnection(connection: ConnectionStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.selectConnectionStatus({
request: {
id: connection.id
}
})
);
}
viewStatusHistory(connection: ConnectionStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.navigateToViewConnectionStatusHistory({
id: connection.id
})
);
}
}

View File

@ -18,10 +18,13 @@
import { NgModule } from '@angular/core';
import { ConnectionStatusListing } from './connection-status-listing.component';
import { CommonModule } from '@angular/common';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { PortStatusTable } from '../common/port-status-table/port-status-table.component';
import { ConnectionStatusTable } from './connection-status-table/connection-status-table.component';
@NgModule({
declarations: [ConnectionStatusListing],
exports: [ConnectionStatusListing],
imports: [CommonModule]
imports: [CommonModule, NgxSkeletonLoaderModule, PortStatusTable, ConnectionStatusTable]
})
export class ConnectionStatusListingModule {}

View File

@ -0,0 +1,205 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="connection-status-table h-full flex flex-col">
<!-- allow filtering of the table -->
<summary-table-filter
[filteredCount]="filteredCount"
[totalCount]="totalCount"
[filterableColumns]="filterableColumns"
[includeStatusFilter]="false"
[includePrimaryNodeOnlyFilter]="false"
filterColumn="sourceName"
(filterChanged)="applyFilter($event)"></summary-table-filter>
<div class="flex-1 relative">
<div class="listing-table overflow-y-auto border absolute inset-0">
<table
mat-table
[dataSource]="dataSource"
matSort
matSortDisableClear
(matSortChange)="sortData($event)"
[matSortActive]="initialSortColumn"
[matSortDirection]="initialSortDirection">
<!-- More Details Column -->
<ng-container matColumnDef="moreDetails">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item"></td>
</ng-container>
<!-- Name Column -->
<ng-container matColumnDef="name">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Name</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatName(item)">
{{ formatName(item) }}
</td>
</ng-container>
<!-- Queued column -->
<ng-container matColumnDef="queue">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'queue' && multiSort.sortValueIndex === 0
}"
>Queue</span
>
<span
[ngClass]="{
underline: multiSort.active === 'queue' && multiSort.sortValueIndex === 1
}"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatQueue(item)">
{{ formatQueue(item) }}
</td>
</ng-container>
<!-- Threshold column -->
<ng-container matColumnDef="threshold">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Percent of threshold used for count and data size">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span>Threshold %:</span>
<span
[ngClass]="{
underline: multiSort.active === 'threshold' && multiSort.sortValueIndex === 0
}"
>Queue</span
>
<span>|</span>
<span
[ngClass]="{
underline: multiSort.active === 'threshold' && multiSort.sortValueIndex === 1
}"
>Size</span
>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatThreshold(item)">
{{ formatThreshold(item) }}
</td>
</ng-container>
<!-- Input column -->
<ng-container matColumnDef="in">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 0 }"
>In</span
>
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatIn(item)">
{{ formatIn(item) }}
</td>
</ng-container>
<!-- Source Column -->
<ng-container matColumnDef="sourceName">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">From Source</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatSource(item)">
{{ formatSource(item) }}
</td>
</ng-container>
<!-- Output column -->
<ng-container matColumnDef="out">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 0 }"
>Out</span
>
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatOut(item)">
{{ formatOut(item) }}
</td>
</ng-container>
<!-- Destination Column -->
<ng-container matColumnDef="destinationName">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">To Destination</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatDestination(item)">
{{ formatDestination(item) }}
</td>
</ng-container>
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-3">
<div
class="pointer fa fa-long-arrow-right"
[routerLink]="getConnectionLink(item)"
(click)="$event.stopPropagation()"
title="Go to connection"></div>
<div
class="pointer fa fa-area-chart"
title="View Status History"
(click)="viewStatusHistoryClicked($event, item)"></div>
</div>
</td>
</ng-container>
<tr mat-header-row *matHeaderRowDef="displayedColumns; sticky: true"></tr>
<tr
mat-row
*matRowDef="let row; let even = even; columns: displayedColumns"
[class.even]="even"
(click)="select(row)"
[class.selected]="isSelected(row)"></tr>
</table>
</div>
</div>
</div>

View File

@ -0,0 +1,30 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.connection-status-table {
.listing-table {
.mat-column-moreDetails {
width: 32px;
min-width: 32px;
}
.mat-column-actions {
width: 72px;
min-width: 72px;
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { ConnectionStatusTable } from './connection-status-table.component';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import { MatSortModule } from '@angular/material/sort';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
describe('ConnectionStatusTable', () => {
let component: ConnectionStatusTable;
let fixture: ComponentFixture<ConnectionStatusTable>;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [ConnectionStatusTable, SummaryTableFilterModule, MatSortModule, NoopAnimationsModule]
});
fixture = TestBed.createComponent(ConnectionStatusTable);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, EventEmitter, Input, Output } from '@angular/core';
import { CommonModule } from '@angular/common';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import { MatSortModule, Sort, SortDirection } from '@angular/material/sort';
import { MultiSort } from '../../common';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
import {
SummaryTableFilterArgs,
SummaryTableFilterColumn
} from '../../common/summary-table-filter/summary-table-filter.component';
import { ConnectionStatusSnapshot, ConnectionStatusSnapshotEntity } from '../../../state/summary-listing';
import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { ComponentType } from '../../../../../state/shared';
import { RouterLink } from '@angular/router';
export type SupportedColumns = 'name' | 'queue' | 'in' | 'out' | 'threshold' | 'sourceName' | 'destinationName';
@Component({
selector: 'connection-status-table',
standalone: true,
imports: [CommonModule, SummaryTableFilterModule, MatSortModule, RouterLink, MatTableModule],
templateUrl: './connection-status-table.component.html',
styleUrls: ['./connection-status-table.component.scss', '../../../../../../assets/styles/listing-table.scss']
})
export class ConnectionStatusTable {
private _initialSortColumn: SupportedColumns = 'sourceName';
private _initialSortDirection: SortDirection = 'asc';
filterableColumns: SummaryTableFilterColumn[] = [
{ key: 'sourceName', label: 'source' },
{ key: 'name', label: 'name' },
{ key: 'destinationName', label: 'destination' }
];
totalCount: number = 0;
filteredCount: number = 0;
multiSort: MultiSort = {
active: this._initialSortColumn,
direction: this._initialSortDirection,
sortValueIndex: 0,
totalValues: 2
};
displayedColumns: string[] = [
'moreDetails',
'name',
'queue',
'threshold',
'in',
'sourceName',
'out',
'destinationName',
'actions'
];
dataSource: MatTableDataSource<ConnectionStatusSnapshotEntity> =
new MatTableDataSource<ConnectionStatusSnapshotEntity>();
constructor(private nifiCommon: NiFiCommon) {}
@Input() set initialSortColumn(initialSortColumn: SupportedColumns) {
this._initialSortColumn = initialSortColumn;
this.multiSort = { ...this.multiSort, active: initialSortColumn };
}
get initialSortColumn() {
return this._initialSortColumn;
}
@Input() set initialSortDirection(initialSortDirection: SortDirection) {
this._initialSortDirection = initialSortDirection;
this.multiSort = { ...this.multiSort, direction: initialSortDirection };
}
get initialSortDirection() {
return this._initialSortDirection;
}
@Input() selectedConnectionId!: string;
@Input() set connections(connections: ConnectionStatusSnapshotEntity[]) {
if (connections) {
this.dataSource.data = this.sortEntities(connections, this.multiSort);
this.dataSource.filterPredicate = (data: ConnectionStatusSnapshotEntity, filter: string) => {
const { filterTerm, filterColumn } = JSON.parse(filter);
if (filterTerm === '') {
return true;
}
const field: string = data.connectionStatusSnapshot[
filterColumn as keyof ConnectionStatusSnapshot
] as string;
return this.nifiCommon.stringContains(field, filterTerm, true);
};
this.totalCount = connections.length;
this.filteredCount = connections.length;
}
}
@Output() viewStatusHistory: EventEmitter<ConnectionStatusSnapshotEntity> =
new EventEmitter<ConnectionStatusSnapshotEntity>();
@Output() selectConnection: EventEmitter<ConnectionStatusSnapshotEntity> =
new EventEmitter<ConnectionStatusSnapshotEntity>();
applyFilter(filter: SummaryTableFilterArgs) {
this.dataSource.filter = JSON.stringify(filter);
this.filteredCount = this.dataSource.filteredData.length;
}
getConnectionLink(connection: ConnectionStatusSnapshotEntity): string[] {
return [
'/process-groups',
connection.connectionStatusSnapshot.groupId,
ComponentType.Connection,
connection.id
];
}
select(connection: ConnectionStatusSnapshotEntity): void {
this.selectConnection.next(connection);
}
isSelected(connection: ConnectionStatusSnapshotEntity): boolean {
if (this.selectedConnectionId) {
return connection.id === this.selectedConnectionId;
}
return false;
}
canRead(connection: ConnectionStatusSnapshotEntity): boolean {
return connection.canRead;
}
sortData(sort: Sort) {
this.setMultiSort(sort);
this.dataSource.data = this.sortEntities(this.dataSource.data, sort);
}
formatName(connection: ConnectionStatusSnapshotEntity): string {
return connection.connectionStatusSnapshot.name;
}
formatSource(connection: ConnectionStatusSnapshotEntity): string {
return connection.connectionStatusSnapshot.sourceName;
}
formatDestination(connection: ConnectionStatusSnapshotEntity): string {
return connection.connectionStatusSnapshot.destinationName;
}
formatIn(connection: ConnectionStatusSnapshotEntity): string {
return connection.connectionStatusSnapshot.input;
}
formatOut(connection: ConnectionStatusSnapshotEntity): string {
return connection.connectionStatusSnapshot.output;
}
formatQueue(connection: ConnectionStatusSnapshotEntity): string {
return connection.connectionStatusSnapshot.queued;
}
formatThreshold(connection: ConnectionStatusSnapshotEntity): string {
return `${connection.connectionStatusSnapshot.percentUseCount}% | ${connection.connectionStatusSnapshot.percentUseBytes}%`;
}
viewStatusHistoryClicked(event: MouseEvent, connection: ConnectionStatusSnapshotEntity): void {
event.stopPropagation();
this.viewStatusHistory.next(connection);
}
private supportsMultiValuedSort(sort: Sort): boolean {
switch (sort.active) {
case 'in':
case 'out':
case 'threshold':
case 'queue':
return true;
default:
return false;
}
}
private setMultiSort(sort: Sort) {
const { active, direction, sortValueIndex, totalValues } = this.multiSort;
if (this.supportsMultiValuedSort(sort)) {
if (active === sort.active) {
// previous sort was of the same column
if (direction === 'desc' && sort.direction === 'asc') {
// change from previous index to the next
const newIndex = sortValueIndex + 1 >= totalValues ? 0 : sortValueIndex + 1;
this.multiSort = { ...sort, sortValueIndex: newIndex, totalValues };
} else {
this.multiSort = { ...sort, sortValueIndex, totalValues };
}
} else {
// sorting a different column, just reset
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
} else {
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
}
private sortEntities(data: ConnectionStatusSnapshotEntity[], sort: Sort): ConnectionStatusSnapshotEntity[] {
if (!data) {
return [];
}
return data.slice().sort((a, b) => {
const isAsc: boolean = sort.direction === 'asc';
let retVal: number = 0;
switch (sort.active) {
case 'name':
retVal = this.nifiCommon.compareString(
a.connectionStatusSnapshot.name,
b.connectionStatusSnapshot.name
);
break;
case 'sourceName':
retVal = this.nifiCommon.compareString(
a.connectionStatusSnapshot.sourceName,
b.connectionStatusSnapshot.sourceName
);
break;
case 'destinationName':
retVal = this.nifiCommon.compareString(
a.connectionStatusSnapshot.destinationName,
b.connectionStatusSnapshot.destinationName
);
break;
case 'in':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.flowFilesIn,
b.connectionStatusSnapshot.flowFilesIn
);
} else {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.bytesIn,
b.connectionStatusSnapshot.bytesIn
);
}
break;
case 'out':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.flowFilesOut,
b.connectionStatusSnapshot.flowFilesOut
);
} else {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.bytesOut,
b.connectionStatusSnapshot.bytesOut
);
}
break;
case 'queue':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.flowFilesQueued,
b.connectionStatusSnapshot.flowFilesQueued
);
} else {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.bytesQueued,
b.connectionStatusSnapshot.bytesQueued
);
}
break;
case 'threshold':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.percentUseCount,
b.connectionStatusSnapshot.percentUseCount
);
} else {
retVal = this.nifiCommon.compareNumber(
a.connectionStatusSnapshot.percentUseBytes,
b.connectionStatusSnapshot.percentUseBytes
);
}
break;
default:
retVal = 0;
}
return retVal * (isAsc ? 1 : -1);
});
}
}

View File

@ -15,4 +15,33 @@
~ limitations under the License.
-->
<p>input-port-status-listing works!</p>
<ng-container>
<div *ngIf="isInitialLoading((loadedTimestamp$ | async)!); else loaded">
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</div>
<ng-template #loaded>
<div class="flex flex-col h-full gap-y-2">
<div class="flex-1" *ngIf="currentUser$ | async as user">
<ng-container>
<port-status-table
[ports]="(portStatusSnapshots$ | async)!"
[selectedPortId]="selectedPortId$ | async"
portType="input"
(selectPort)="selectPort($event)"
initialSortColumn="name"
initialSortDirection="asc"></port-status-table>
</ng-container>
</div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshSummaryListing()">
<i class="fa fa-refresh" [class.fa-spin]="(summaryListingStatus$ | async) === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div>
</ng-template>
</ng-container>

View File

@ -18,6 +18,15 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { InputPortStatusListing } from './input-port-status-listing.component';
import { CommonModule } from '@angular/common';
import { SummaryTableFilterModule } from '../common/summary-table-filter/summary-table-filter.module';
import { MatSortModule } from '@angular/material/sort';
import { MatTableModule } from '@angular/material/table';
import { RouterLink } from '@angular/router';
import { PortStatusTable } from '../common/port-status-table/port-status-table.component';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
describe('InputPortStatusListing', () => {
let component: InputPortStatusListing;
@ -25,7 +34,9 @@ describe('InputPortStatusListing', () => {
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [InputPortStatusListing]
declarations: [InputPortStatusListing],
imports: [PortStatusTable, SummaryTableFilterModule, NoopAnimationsModule],
providers: [provideMockStore({ initialState })]
});
fixture = TestBed.createComponent(InputPortStatusListing);
component = fixture.componentInstance;

View File

@ -16,10 +16,47 @@
*/
import { Component } from '@angular/core';
import {
selectInputPortIdFromRoute,
selectInputPortStatusSnapshots,
selectSummaryListingLoadedTimestamp,
selectSummaryListingStatus
} from '../../state/summary-listing/summary-listing.selectors';
import { selectUser } from '../../../../state/user/user.selectors';
import { PortStatusSnapshotEntity, SummaryListingState } from '../../state/summary-listing';
import { Store } from '@ngrx/store';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
import * as SummaryListingActions from '../../state/summary-listing/summary-listing.actions';
@Component({
selector: 'input-port-status-listing',
templateUrl: './input-port-status-listing.component.html',
styleUrls: ['./input-port-status-listing.component.scss']
})
export class InputPortStatusListing {}
export class InputPortStatusListing {
portStatusSnapshots$ = this.store.select(selectInputPortStatusSnapshots);
loadedTimestamp$ = this.store.select(selectSummaryListingLoadedTimestamp);
summaryListingStatus$ = this.store.select(selectSummaryListingStatus);
currentUser$ = this.store.select(selectUser);
selectedPortId$ = this.store.select(selectInputPortIdFromRoute);
constructor(private store: Store<SummaryListingState>) {}
isInitialLoading(loadedTimestamp: string): boolean {
return loadedTimestamp == initialState.loadedTimestamp;
}
refreshSummaryListing() {
this.store.dispatch(SummaryListingActions.loadSummaryListing({ recursive: true }));
}
selectPort(port: PortStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.selectInputPortStatus({
request: {
id: port.id
}
})
);
}
}

View File

@ -18,9 +18,12 @@
import { NgModule } from '@angular/core';
import { CommonModule } from '@angular/common';
import { InputPortStatusListing } from './input-port-status-listing.component';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { ProcessGroupStatusTable } from '../process-group-status-listing/process-group-status-table/process-group-status-table.component';
import { PortStatusTable } from '../common/port-status-table/port-status-table.component';
@NgModule({
declarations: [InputPortStatusListing],
imports: [CommonModule]
imports: [CommonModule, NgxSkeletonLoaderModule, ProcessGroupStatusTable, PortStatusTable]
})
export class InputPortStatusListingModule {}

View File

@ -15,4 +15,33 @@
~ limitations under the License.
-->
<p>output-port-status-listing works!</p>
<ng-container>
<div *ngIf="isInitialLoading((loadedTimestamp$ | async)!); else loaded">
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</div>
<ng-template #loaded>
<div class="flex flex-col h-full gap-y-2">
<div class="flex-1" *ngIf="currentUser$ | async as user">
<ng-container>
<port-status-table
[ports]="(portStatusSnapshots$ | async)!"
[selectedPortId]="selectedPortId$ | async"
portType="output"
(selectPort)="selectPort($event)"
initialSortColumn="name"
initialSortDirection="asc"></port-status-table>
</ng-container>
</div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshSummaryListing()">
<i class="fa fa-refresh" [class.fa-spin]="(summaryListingStatus$ | async) === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div>
</ng-template>
</ng-container>

View File

@ -18,6 +18,11 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { OutputPortStatusListing } from './output-port-status-listing.component';
import { PortStatusTable } from '../common/port-status-table/port-status-table.component';
import { SummaryTableFilterModule } from '../common/summary-table-filter/summary-table-filter.module';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
describe('OutputPortStatusListing', () => {
let component: OutputPortStatusListing;
@ -25,7 +30,9 @@ describe('OutputPortStatusListing', () => {
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [OutputPortStatusListing]
declarations: [OutputPortStatusListing],
imports: [PortStatusTable, SummaryTableFilterModule, NoopAnimationsModule],
providers: [provideMockStore({ initialState })]
});
fixture = TestBed.createComponent(OutputPortStatusListing);
component = fixture.componentInstance;

View File

@ -16,10 +16,49 @@
*/
import { Component } from '@angular/core';
import {
selectInputPortIdFromRoute,
selectInputPortStatusSnapshots,
selectOutputPortIdFromRoute,
selectOutputPortStatusSnapshots,
selectSummaryListingLoadedTimestamp,
selectSummaryListingStatus
} from '../../state/summary-listing/summary-listing.selectors';
import { selectUser } from '../../../../state/user/user.selectors';
import { Store } from '@ngrx/store';
import { PortStatusSnapshotEntity, SummaryListingState } from '../../state/summary-listing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
import * as SummaryListingActions from '../../state/summary-listing/summary-listing.actions';
@Component({
selector: 'output-port-status-listing',
templateUrl: './output-port-status-listing.component.html',
styleUrls: ['./output-port-status-listing.component.scss']
})
export class OutputPortStatusListing {}
export class OutputPortStatusListing {
portStatusSnapshots$ = this.store.select(selectOutputPortStatusSnapshots);
loadedTimestamp$ = this.store.select(selectSummaryListingLoadedTimestamp);
summaryListingStatus$ = this.store.select(selectSummaryListingStatus);
currentUser$ = this.store.select(selectUser);
selectedPortId$ = this.store.select(selectOutputPortIdFromRoute);
constructor(private store: Store<SummaryListingState>) {}
isInitialLoading(loadedTimestamp: string): boolean {
return loadedTimestamp == initialState.loadedTimestamp;
}
refreshSummaryListing() {
this.store.dispatch(SummaryListingActions.loadSummaryListing({ recursive: true }));
}
selectPort(port: PortStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.selectOutputPortStatus({
request: {
id: port.id
}
})
);
}
}

View File

@ -18,10 +18,12 @@
import { NgModule } from '@angular/core';
import { CommonModule } from '@angular/common';
import { OutputPortStatusListing } from './output-port-status-listing.component';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { PortStatusTable } from '../common/port-status-table/port-status-table.component';
@NgModule({
declarations: [OutputPortStatusListing],
exports: [OutputPortStatusListing],
imports: [CommonModule]
imports: [CommonModule, NgxSkeletonLoaderModule, PortStatusTable]
})
export class OutputPortStatusListingModule {}

View File

@ -15,4 +15,34 @@
~ limitations under the License.
-->
<p>process-group-status-listing works!</p>
<ng-container>
<div *ngIf="isInitialLoading((loadedTimestamp$ | async)!); else loaded">
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</div>
<ng-template #loaded>
<div class="flex flex-col h-full gap-y-2">
<div class="flex-1" *ngIf="currentUser$ | async as user">
<ng-container>
<process-group-status-table
[processGroups]="(processGroupStatusSnapshots$ | async)!"
[selectedProcessGroupId]="selectedProcessGroupId$ | async"
[rootProcessGroup]="(processGroupStatus$ | async)?.processGroupStatus?.aggregateSnapshot!"
(viewStatusHistory)="viewStatusHistory($event)"
(selectProcessGroup)="selectProcessGroup($event)"
initialSortColumn="name"
initialSortDirection="asc"></process-group-status-table>
</ng-container>
</div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshSummaryListing()">
<i class="fa fa-refresh" [class.fa-spin]="(summaryListingStatus$ | async) === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div>
</ng-template>
</ng-container>

View File

@ -18,6 +18,11 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { ProcessGroupStatusListing } from './process-group-status-listing.component';
import { SummaryTableFilterModule } from '../common/summary-table-filter/summary-table-filter.module';
import { ProcessGroupStatusTable } from './process-group-status-table/process-group-status-table.component';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
describe('ProcessGroupStatusListing', () => {
let component: ProcessGroupStatusListing;
@ -25,7 +30,9 @@ describe('ProcessGroupStatusListing', () => {
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [ProcessGroupStatusListing]
declarations: [ProcessGroupStatusListing],
imports: [SummaryTableFilterModule, ProcessGroupStatusTable, NoopAnimationsModule],
providers: [provideMockStore({ initialState })]
});
fixture = TestBed.createComponent(ProcessGroupStatusListing);
component = fixture.componentInstance;

View File

@ -16,10 +16,95 @@
*/
import { Component } from '@angular/core';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
import * as SummaryListingActions from '../../state/summary-listing/summary-listing.actions';
import {
ProcessGroupStatusSnapshotEntity,
ProcessorStatusSnapshotEntity,
SummaryListingState
} from '../../state/summary-listing';
import { Store } from '@ngrx/store';
import {
selectProcessGroupIdFromRoute,
selectProcessGroupStatus,
selectProcessGroupStatusItem,
selectProcessGroupStatusSnapshots,
selectProcessorStatus,
selectProcessorStatusSnapshots,
selectSummaryListingLoadedTimestamp,
selectSummaryListingStatus,
selectViewStatusHistory
} from '../../state/summary-listing/summary-listing.selectors';
import { filter, switchMap, take } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { openStatusHistoryDialog } from '../../../../state/status-history/status-history.actions';
import { ComponentType } from '../../../../state/shared';
import { selectUser } from '../../../../state/user/user.selectors';
@Component({
selector: 'process-group-status-listing',
templateUrl: './process-group-status-listing.component.html',
styleUrls: ['./process-group-status-listing.component.scss']
})
export class ProcessGroupStatusListing {}
export class ProcessGroupStatusListing {
processGroupStatusSnapshots$ = this.store.select(selectProcessGroupStatusSnapshots);
loadedTimestamp$ = this.store.select(selectSummaryListingLoadedTimestamp);
summaryListingStatus$ = this.store.select(selectSummaryListingStatus);
currentUser$ = this.store.select(selectUser);
selectedProcessGroupId$ = this.store.select(selectProcessGroupIdFromRoute);
processGroupStatus$ = this.store.select(selectProcessGroupStatus);
constructor(private store: Store<SummaryListingState>) {
this.store
.select(selectViewStatusHistory)
.pipe(
filter((id: string) => !!id),
switchMap((id: string) =>
this.store.select(selectProcessGroupStatusItem(id)).pipe(
filter((pg) => !!pg),
take(1)
)
),
takeUntilDestroyed()
)
.subscribe((pg) => {
if (pg) {
this.store.dispatch(
openStatusHistoryDialog({
request: {
source: 'summary',
componentType: ComponentType.ProcessGroup,
componentId: pg.id
}
})
);
}
});
}
isInitialLoading(loadedTimestamp: string): boolean {
return loadedTimestamp == initialState.loadedTimestamp;
}
refreshSummaryListing() {
this.store.dispatch(SummaryListingActions.loadSummaryListing({ recursive: true }));
}
viewStatusHistory(pg: ProcessGroupStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.navigateToViewProcessGroupStatusHistory({
id: pg.id
})
);
}
selectProcessGroup(pg: ProcessGroupStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.selectProcessGroupStatus({
request: {
id: pg.id
}
})
);
}
}

View File

@ -18,10 +18,12 @@
import { NgModule } from '@angular/core';
import { ProcessGroupStatusListing } from './process-group-status-listing.component';
import { CommonModule } from '@angular/common';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { ProcessGroupStatusTable } from './process-group-status-table/process-group-status-table.component';
@NgModule({
declarations: [ProcessGroupStatusListing],
exports: [ProcessGroupStatusListing],
imports: [CommonModule]
imports: [CommonModule, NgxSkeletonLoaderModule, ProcessGroupStatusTable]
})
export class ProcessGroupStatusListingModule {}

View File

@ -0,0 +1,313 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="process-group-status-table h-full flex flex-col">
<!-- allow filtering of the table -->
<summary-table-filter
[filteredCount]="filteredCount"
[totalCount]="totalCount"
[filterableColumns]="filterableColumns"
[includeStatusFilter]="false"
[includePrimaryNodeOnlyFilter]="false"
(filterChanged)="applyFilter($event)"></summary-table-filter>
<div class="flex-1 relative">
<div class="listing-table overflow-y-auto border absolute inset-0">
<table
mat-table
[dataSource]="dataSource"
matSort
matSortDisableClear
(matSortChange)="sortData($event)"
[matSortActive]="initialSortColumn"
[matSortDirection]="initialSortDirection">
<!-- More Details Column -->
<ng-container matColumnDef="moreDetails">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<ng-container *ngIf="canRead(item)">
<div class="flex items-center gap-x-3">
<!-- TODO - handle read only in configure component? -->
<div
class="pointer fa fa-info-circle"
*ngIf="canRead(item)"
title="View Process Group Details"></div>
</div>
</ng-container>
</td>
</ng-container>
<!-- Name Column -->
<ng-container matColumnDef="name">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Name</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatName(item)">
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">
{{ formatName(item) }}
</div>
</td>
</ng-container>
<!-- Version State column -->
<ng-container matColumnDef="versionedFlowState">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Version State</div>
</th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-1.5" [title]="formatVersionedFlowState(item)">
<div [ngClass]="getVersionedFlowStateIcon(item)"></div>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap min-w-0">
{{ formatVersionedFlowState(item) }}
</div>
</div>
</td>
</ng-container>
<!-- Transferred column -->
<ng-container matColumnDef="transferred">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div
class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1"
title="Count / data size transferred to and from connections in the last 5 min">
<span
[ngClass]="{
underline: multiSort.active === 'transferred' && multiSort.sortValueIndex === 0
}"
>Transferred</span
>
<span
[ngClass]="{
underline: multiSort.active === 'transferred' && multiSort.sortValueIndex === 1
}"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatTransferred(item)">
{{ formatTransferred(item) }}
</td>
</ng-container>
<!-- Input column -->
<ng-container matColumnDef="in">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 0 }"
>In</span
>
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatIn(item)">
{{ formatIn(item) }}
</td>
</ng-container>
<!-- Read Write column -->
<ng-container matColumnDef="readWrite">
<th mat-header-cell *matHeaderCellDef mat-sort-header title="Data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'readWrite' && multiSort.sortValueIndex === 0
}"
>Read</span
>
<span>|</span>
<span
[ngClass]="{
underline: multiSort.active === 'readWrite' && multiSort.sortValueIndex === 1
}"
>Write</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatReadWrite(item)">
{{ formatReadWrite(item) }}
</td>
</ng-container>
<!-- Output column -->
<ng-container matColumnDef="out">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 0 }"
>Out</span
>
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatOut(item)">
{{ formatOut(item) }}
</td>
</ng-container>
<!-- Sent column -->
<ng-container matColumnDef="sent">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{ underline: multiSort.active === 'sent' && multiSort.sortValueIndex === 0 }"
>Sent</span
>
<span
[ngClass]="{ underline: multiSort.active === 'sent' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatSent(item)">
{{ formatSent(item) }}
</td>
</ng-container>
<!-- Received column -->
<ng-container matColumnDef="received">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'received' && multiSort.sortValueIndex === 0
}"
>Received</span
>
<span
[ngClass]="{
underline: multiSort.active === 'received' && multiSort.sortValueIndex === 1
}"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatReceived(item)">
{{ formatReceived(item) }}
</td>
</ng-container>
<!-- Received column -->
<ng-container matColumnDef="activeThreads">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Total active thread count within ProcessGroup (% of total active thread count compared to overall active thread count in root ProcessGroup) in the last 5 min">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'activeThreads' && multiSort.sortValueIndex === 0
}"
>Active Threads</span
>
<span
[ngClass]="{
underline: multiSort.active === 'activeThreads' && multiSort.sortValueIndex === 1
}"
>(%)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatActiveThreads(item)">
{{ formatActiveThreads(item) }}
</td>
</ng-container>
<!-- Tasks column -->
<ng-container matColumnDef="tasks">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Total task duration within ProcessGroup (% of total task duration compared to overall task duration in root ProcessGroup) in the last 5 min">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'tasks' && multiSort.sortValueIndex === 0
}"
>Total Task Duration</span
>
<span
[ngClass]="{
underline: multiSort.active === 'tasks' && multiSort.sortValueIndex === 1
}"
>(%)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatTasks(item)">
{{ formatTasks(item) }}
</td>
</ng-container>
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-3">
<div
class="pointer fa fa-long-arrow-right"
[routerLink]="getProcessGroupLink(item)"
(click)="$event.stopPropagation()"
title="Go to Process Group {{ item?.processGroupStatusSnapshot?.name }}"></div>
<div
class="pointer fa fa-area-chart"
title="View Status History"
(click)="viewStatusHistoryClicked($event, item)"></div>
</div>
</td>
</ng-container>
<tr mat-header-row *matHeaderRowDef="displayedColumns; sticky: true"></tr>
<tr
mat-row
*matRowDef="let row; let even = even; columns: displayedColumns"
[class.even]="even"
(click)="select(row)"
[class.selected]="isSelected(row)"></tr>
</table>
</div>
</div>
</div>

View File

@ -0,0 +1,30 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.process-group-status-table {
.listing-table {
.mat-column-moreDetails {
width: 32px;
min-width: 32px;
}
.mat-column-actions {
width: 72px;
min-width: 72px;
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { ProcessGroupStatusTable } from './process-group-status-table.component';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import { MatSortModule } from '@angular/material/sort';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
describe('ProcessGroupStatusTable', () => {
let component: ProcessGroupStatusTable;
let fixture: ComponentFixture<ProcessGroupStatusTable>;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [ProcessGroupStatusTable, SummaryTableFilterModule, MatSortModule, NoopAnimationsModule]
});
fixture = TestBed.createComponent(ProcessGroupStatusTable);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,436 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, EventEmitter, Input, Output } from '@angular/core';
import { CommonModule } from '@angular/common';
import { MatSortModule, Sort, SortDirection } from '@angular/material/sort';
import { MultiSort } from '../../common';
import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import {
ProcessGroupStatusSnapshot,
ProcessGroupStatusSnapshotEntity,
ProcessorStatusSnapshot,
ProcessorStatusSnapshotEntity,
VersionedFlowState
} from '../../../state/summary-listing';
import {
SummaryTableFilterArgs,
SummaryTableFilterColumn
} from '../../common/summary-table-filter/summary-table-filter.component';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
import { RouterLink } from '@angular/router';
export type SupportedColumns =
| 'name'
| 'versionedFlowState'
| 'transferred'
| 'in'
| 'readWrite'
| 'out'
| 'sent'
| 'received'
| 'activeThreads'
| 'tasks';
@Component({
selector: 'process-group-status-table',
standalone: true,
imports: [CommonModule, MatSortModule, MatTableModule, SummaryTableFilterModule, RouterLink],
templateUrl: './process-group-status-table.component.html',
styleUrls: ['./process-group-status-table.component.scss', '../../../../../../assets/styles/listing-table.scss']
})
export class ProcessGroupStatusTable {
private _initialSortColumn: SupportedColumns = 'name';
private _initialSortDirection: SortDirection = 'asc';
filterableColumns: SummaryTableFilterColumn[] = [{ key: 'name', label: 'name' }];
totalCount: number = 0;
filteredCount: number = 0;
multiSort: MultiSort = {
active: this._initialSortColumn,
direction: this._initialSortDirection,
sortValueIndex: 0,
totalValues: 2
};
displayedColumns: string[] = [
'moreDetails',
'name',
'versionedFlowState',
'transferred',
'in',
'readWrite',
'out',
'sent',
'received',
'activeThreads',
'tasks',
'actions'
];
dataSource: MatTableDataSource<ProcessGroupStatusSnapshotEntity> =
new MatTableDataSource<ProcessGroupStatusSnapshotEntity>();
constructor(private nifiCommon: NiFiCommon) {}
applyFilter(filter: SummaryTableFilterArgs) {
this.dataSource.filter = JSON.stringify(filter);
this.filteredCount = this.dataSource.filteredData.length;
}
@Input() selectedProcessGroupId!: string;
@Input() set initialSortColumn(initialSortColumn: SupportedColumns) {
this._initialSortColumn = initialSortColumn;
this.multiSort = { ...this.multiSort, active: initialSortColumn };
}
get initialSortColumn() {
return this._initialSortColumn;
}
@Input() set initialSortDirection(initialSortDirection: SortDirection) {
this._initialSortDirection = initialSortDirection;
this.multiSort = { ...this.multiSort, direction: initialSortDirection };
}
get initialSortDirection() {
return this._initialSortDirection;
}
@Input() rootProcessGroup!: ProcessGroupStatusSnapshot;
@Input() set processGroups(processGroups: ProcessGroupStatusSnapshotEntity[]) {
if (processGroups) {
this.dataSource.data = this.sortEntities(processGroups, this.multiSort);
this.dataSource.filterPredicate = (data: ProcessGroupStatusSnapshotEntity, filter: string): boolean => {
const { filterTerm, filterColumn } = JSON.parse(filter);
if (filterTerm === '') {
return true;
}
const field: string = data.processGroupStatusSnapshot[
filterColumn as keyof ProcessGroupStatusSnapshot
] as string;
return this.nifiCommon.stringContains(field, filterTerm, true);
};
this.totalCount = processGroups.length;
this.filteredCount = processGroups.length;
}
}
@Output() viewStatusHistory: EventEmitter<ProcessGroupStatusSnapshotEntity> =
new EventEmitter<ProcessGroupStatusSnapshotEntity>();
@Output() selectProcessGroup: EventEmitter<ProcessGroupStatusSnapshotEntity> =
new EventEmitter<ProcessGroupStatusSnapshotEntity>();
formatName(pg: ProcessGroupStatusSnapshotEntity): string {
return pg.processGroupStatusSnapshot.name;
}
private versionedFlowStateMap: { [key: string]: { classes: string; label: string } } = {
STALE: {
classes: 'fa fa-arrow-circle-up stale',
label: 'Stale'
},
LOCALLY_MODIFIED: {
classes: 'fa fa-asterisk locally-modified',
label: 'Locally modified'
},
UP_TO_DATE: {
classes: 'fa fa-check up-to-date',
label: 'Up to date'
},
LOCALLY_MODIFIED_AND_STALE: {
classes: 'fa fa-exclamation-circle locally-modified-and-stale',
label: 'Locally modified and stale'
},
SYNC_FAILURE: {
classes: 'fa fa-question sync-failure',
label: 'Sync failure'
}
};
formatVersionedFlowState(pg: ProcessGroupStatusSnapshotEntity): string {
if (!pg.processGroupStatusSnapshot.versionedFlowState) {
return '';
}
return this.versionedFlowStateMap[pg.processGroupStatusSnapshot.versionedFlowState].label;
}
getVersionedFlowStateIcon(pg: ProcessGroupStatusSnapshotEntity): string {
if (!pg.processGroupStatusSnapshot.versionedFlowState) {
return '';
}
return this.versionedFlowStateMap[pg.processGroupStatusSnapshot.versionedFlowState].classes;
}
formatTransferred(pg: ProcessGroupStatusSnapshotEntity): string {
return pg.processGroupStatusSnapshot.transferred;
}
formatIn(pg: ProcessGroupStatusSnapshotEntity): string {
return pg.processGroupStatusSnapshot.input;
}
formatReadWrite(pg: ProcessGroupStatusSnapshotEntity): string {
return `${pg.processGroupStatusSnapshot.read} | ${pg.processGroupStatusSnapshot.written}`;
}
formatOut(pg: ProcessGroupStatusSnapshotEntity): string {
return pg.processGroupStatusSnapshot.output;
}
formatSent(pg: ProcessGroupStatusSnapshotEntity): string {
return pg.processGroupStatusSnapshot.sent;
}
formatReceived(pg: ProcessGroupStatusSnapshotEntity): string {
return pg.processGroupStatusSnapshot.received;
}
formatActiveThreads(pg: ProcessGroupStatusSnapshotEntity): string {
const percentage: number = this.calculatePercent(
pg.processGroupStatusSnapshot.activeThreadCount,
this.rootProcessGroup.activeThreadCount
);
return `${pg.processGroupStatusSnapshot.activeThreadCount} (${percentage}%)`;
}
formatTasks(pg: ProcessGroupStatusSnapshotEntity): string {
const percentage: number = this.calculatePercent(
pg.processGroupStatusSnapshot.processingNanos,
this.rootProcessGroup.processingNanos
);
return `${this.nifiCommon.formatDuration(pg.processGroupStatusSnapshot.processingNanos)} (${percentage}%)`;
}
private calculatePercent(used: number, total: number): number {
if (total !== undefined && total > 0) {
return Math.round((used / total) * 100);
}
return 0;
}
private supportsMultiValuedSort(sort: Sort): boolean {
switch (sort.active) {
case 'transferred':
case 'in':
case 'out':
case 'readWrite':
case 'received':
case 'sent':
case 'activeThreads':
case 'tasks':
return true;
default:
return false;
}
}
private setMultiSort(sort: Sort) {
const { active, direction, sortValueIndex, totalValues } = this.multiSort;
if (this.supportsMultiValuedSort(sort)) {
if (active === sort.active) {
// previous sort was of the same column
if (direction === 'desc' && sort.direction === 'asc') {
// change from previous index to the next
const newIndex = sortValueIndex + 1 >= totalValues ? 0 : sortValueIndex + 1;
this.multiSort = { ...sort, sortValueIndex: newIndex, totalValues };
} else {
this.multiSort = { ...sort, sortValueIndex, totalValues };
}
} else {
// sorting a different column, just reset
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
} else {
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
}
sortData(sort: Sort) {
this.setMultiSort(sort);
this.dataSource.data = this.sortEntities(this.dataSource.data, sort);
}
private sortEntities(data: ProcessGroupStatusSnapshotEntity[], sort: Sort): ProcessGroupStatusSnapshotEntity[] {
if (!data) {
return [];
}
let aggregateDuration: number = 0;
let aggregateActiveThreads: number = 0;
if (this.rootProcessGroup) {
aggregateDuration = this.rootProcessGroup.processingNanos;
aggregateActiveThreads = this.rootProcessGroup.activeThreadCount;
}
return data.slice().sort((a, b) => {
const isAsc = sort.direction === 'asc';
let retVal: number = 0;
switch (sort.active) {
case 'name':
retVal = this.nifiCommon.compareString(this.formatName(a), this.formatName(b));
break;
case 'versionedFlowState':
retVal = this.nifiCommon.compareString(
this.formatVersionedFlowState(a),
this.formatVersionedFlowState(b)
);
break;
case 'transferred':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.flowFilesTransferred,
b.processGroupStatusSnapshot.flowFilesTransferred
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesTransferred,
b.processGroupStatusSnapshot.bytesTransferred
);
}
break;
case 'in':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.flowFilesIn,
b.processGroupStatusSnapshot.flowFilesIn
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesIn,
b.processGroupStatusSnapshot.bytesIn
);
}
break;
case 'out':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.flowFilesOut,
b.processGroupStatusSnapshot.flowFilesOut
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesOut,
b.processGroupStatusSnapshot.bytesOut
);
}
break;
case 'readWrite':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesRead,
b.processGroupStatusSnapshot.bytesRead
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesWritten,
b.processGroupStatusSnapshot.bytesWritten
);
}
break;
case 'sent':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.flowFilesSent,
b.processGroupStatusSnapshot.flowFilesSent
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesSent,
b.processGroupStatusSnapshot.bytesSent
);
}
break;
case 'received':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.flowFilesReceived,
b.processGroupStatusSnapshot.flowFilesReceived
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.bytesReceived,
b.processGroupStatusSnapshot.bytesReceived
);
}
break;
case 'activeThreads':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.activeThreadCount,
b.processGroupStatusSnapshot.activeThreadCount
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.activeThreadCount / aggregateActiveThreads,
b.processGroupStatusSnapshot.activeThreadCount / aggregateActiveThreads
);
}
break;
case 'tasks':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.processingNanos,
b.processGroupStatusSnapshot.processingNanos
);
} else {
retVal = this.nifiCommon.compareNumber(
a.processGroupStatusSnapshot.processingNanos / aggregateDuration,
b.processGroupStatusSnapshot.processingNanos / aggregateDuration
);
}
break;
default:
retVal = 0;
}
return retVal * (isAsc ? 1 : -1);
});
}
canRead(pg: ProcessGroupStatusSnapshotEntity) {
return pg.canRead;
}
getProcessGroupLink(pg: ProcessGroupStatusSnapshotEntity): string[] {
return ['/process-groups', pg.id];
}
select(pg: ProcessGroupStatusSnapshotEntity): void {
this.selectProcessGroup.next(pg);
}
isSelected(pg: ProcessGroupStatusSnapshotEntity): boolean {
if (this.selectedProcessGroupId) {
return pg.id === this.selectedProcessGroupId;
}
return false;
}
viewStatusHistoryClicked(event: MouseEvent, pg: ProcessGroupStatusSnapshotEntity): void {
event.stopPropagation();
this.viewStatusHistory.next(pg);
}
}

View File

@ -22,9 +22,9 @@ import { SummaryTableFilter } from '../common/summary-table-filter/summary-table
import { SummaryTableFilterModule } from '../common/summary-table-filter/summary-table-filter.module';
import { ProcessorStatusListingModule } from './processor-status-listing.module';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../../counters/state/counter-listing/counter-listing.reducer';
import { ProcessorStatusTable } from './processor-status-table/processor-status-table.component';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
describe('ProcessorStatusListing', () => {
let component: ProcessorStatusListing;

View File

@ -52,24 +52,30 @@
<!-- Name Column -->
<ng-container matColumnDef="name">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Name</th>
<td mat-cell *matCellDef="let item">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Name</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatName(item)">
{{ formatName(item) }}
</td>
</ng-container>
<!-- Type column -->
<ng-container matColumnDef="type">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Type</th>
<td mat-cell *matCellDef="let item">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Type</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatType(item)">
{{ formatType(item) }}
</td>
</ng-container>
<!-- Process Group column -->
<ng-container matColumnDef="processGroup">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Process Group</th>
<td mat-cell *matCellDef="let item">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Process Group</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatProcessGroup(item)">
{{ formatProcessGroup(item) }}
</td>
</ng-container>
@ -78,20 +84,20 @@
<ng-container matColumnDef="runStatus">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Run Status</th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-2">
<div [ngClass]="getRunStatusIcon(item)"></div>
<div>{{ formatRunStatus(item) }}</div>
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1.5">
<span [ngClass]="getRunStatusIcon(item)"></span>
<span [title]="formatRunStatus(item)">{{ formatRunStatus(item) }}</span>
<ng-container *ngIf="item.processorStatusSnapshot as pg">
<div
<span
*ngIf="pg.terminatedThreadCount > 0; else activeThreads"
title="Threads: (Active / Terminated)">
({{ pg.activeThreadCount }}/{{ pg.terminatedThreadCount }})
</div>
title="Threads: (Active / Terminated)"
>({{ pg.activeThreadCount }}/{{ pg.terminatedThreadCount }})</span
>
<ng-template #activeThreads>
<div *ngIf="pg.activeThreadCount > 0" title="Active Threads">
({{ pg.activeThreadCount }})
</div>
<span *ngIf="pg.activeThreadCount > 0" title="Active Threads"
>({{ pg.activeThreadCount }})</span
>
</ng-template>
</ng-container>
</div>
@ -105,17 +111,17 @@
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="flex items-center gap-x-1">
<div [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 0 }">
In
</div>
<div [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 1 }">
(Size)
</div>
<div class="font-light">5 min</div>
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 0 }"
>In</span
>
<span [ngClass]="{ underline: multiSort.active === 'in' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item">
<td mat-cell *matCellDef="let item" [title]="formatIn(item)">
{{ formatIn(item) }}
</td>
</ng-container>
@ -123,24 +129,24 @@
<!-- Read Write column -->
<ng-container matColumnDef="readWrite">
<th mat-header-cell *matHeaderCellDef mat-sort-header title="Data size in the last 5 minutes">
<div class="flex items-center gap-x-1">
<div
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'readWrite' && multiSort.sortValueIndex === 0
}">
Read
</div>
<div>|</div>
<div
}"
>Read</span
>
<span>|</span>
<span
[ngClass]="{
underline: multiSort.active === 'readWrite' && multiSort.sortValueIndex === 1
}">
Write
</div>
<div class="font-light">5 min</div>
}"
>Write</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item">
<td mat-cell *matCellDef="let item" [title]="formatReadWrite(item)">
{{ formatReadWrite(item) }}
</td>
</ng-container>
@ -152,19 +158,19 @@
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="flex items-center gap-x-1">
<div
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 0 }">
Out
</div>
<div
</span>
<span
[ngClass]="{ underline: multiSort.active === 'out' && multiSort.sortValueIndex === 1 }">
(Size)
</div>
<div class="font-light">5 min</div>
</span>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item">
<td mat-cell *matCellDef="let item" [title]="formatOut(item)">
{{ formatOut(item) }}
</td>
</ng-container>
@ -176,24 +182,24 @@
*matHeaderCellDef
mat-sort-header
title="Count / duration in the last 5 minutes">
<div class="flex items-center gap-x-1">
<div
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'tasks' && multiSort.sortValueIndex === 0
}">
Tasks
</div>
<div>|</div>
<div
}"
>Tasks</span
>
<span>|</span>
<span
[ngClass]="{
underline: multiSort.active === 'tasks' && multiSort.sortValueIndex === 1
}">
Time
</div>
<div class="font-light">5 min</div>
}"
>Time</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item">
<td mat-cell *matCellDef="let item" [title]="formatTasks(item)">
{{ formatTasks(item) }}
</td>
</ng-container>

View File

@ -16,6 +16,11 @@
*/
.processor-status-table {
.listing-table {
.mat-column-moreDetails {
width: 32px;
min-width: 32px;
}
.mat-column-actions {
width: 72px;
min-width: 72px;

View File

@ -19,19 +19,19 @@ import { Component, EventEmitter, Input, Output } from '@angular/core';
import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { ProcessorStatusSnapshot, ProcessorStatusSnapshotEntity } from '../../../state/summary-listing';
import { MatSortModule, Sort, SortDirection } from '@angular/material/sort';
import { SummaryTableFilterArgs } from '../../common/summary-table-filter/summary-table-filter.component';
import {
SummaryTableFilterArgs,
SummaryTableFilterColumn
} from '../../common/summary-table-filter/summary-table-filter.component';
import { RouterLink } from '@angular/router';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import { NgClass, NgIf } from '@angular/common';
import { ComponentType } from '../../../../../state/shared';
import { MultiSort } from '../../common';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
export type SupportedColumns = 'name' | 'type' | 'processGroup' | 'runStatus' | 'in' | 'out' | 'readWrite' | 'tasks';
export interface MultiSort extends Sort {
sortValueIndex: number;
totalValues: number;
}
@Component({
selector: 'processor-status-table',
templateUrl: './processor-status-table.component.html',
@ -43,7 +43,10 @@ export class ProcessorStatusTable {
private _initialSortColumn: SupportedColumns = 'name';
private _initialSortDirection: SortDirection = 'asc';
filterableColumns: string[] = ['name', 'type'];
filterableColumns: SummaryTableFilterColumn[] = [
{ key: 'name', label: 'name' },
{ key: 'type', label: 'type' }
];
totalCount: number = 0;
filteredCount: number = 0;
@ -69,10 +72,10 @@ export class ProcessorStatusTable {
dataSource: MatTableDataSource<ProcessorStatusSnapshotEntity> =
new MatTableDataSource<ProcessorStatusSnapshotEntity>();
constructor() {}
constructor(private nifiCommon: NiFiCommon) {}
applyFilter(filter: SummaryTableFilterArgs) {
this.dataSource.filter = `${filter.filterTerm}|${filter.filterColumn}|${filter.filterStatus}|${filter.primaryOnly}`;
this.dataSource.filter = JSON.stringify(filter);
this.filteredCount = this.dataSource.filteredData.length;
}
@ -100,11 +103,7 @@ export class ProcessorStatusTable {
if (processors) {
this.dataSource.data = this.sortEntities(processors, this.multiSort);
this.dataSource.filterPredicate = (data: ProcessorStatusSnapshotEntity, filter: string): boolean => {
const filterArray: string[] = filter.split('|');
const filterTerm: string = filterArray[0] || '';
const filterColumn: string = filterArray[1];
const filterStatus: string = filterArray[2];
const primaryOnly: boolean = filterArray[3] === 'true';
const { filterTerm, filterColumn, filterStatus, primaryOnly } = JSON.parse(filter);
const matchOnStatus: boolean = filterStatus !== 'All';
if (primaryOnly) {
@ -121,16 +120,10 @@ export class ProcessorStatusTable {
return true;
}
try {
const filterExpression: RegExp = new RegExp(filterTerm, 'i');
const field: string = data.processorStatusSnapshot[
filterColumn as keyof ProcessorStatusSnapshot
] as string;
return field.search(filterExpression) >= 0;
} catch (e) {
// invalid regex;
return false;
}
const field: string = data.processorStatusSnapshot[
filterColumn as keyof ProcessorStatusSnapshot
] as string;
return this.nifiCommon.stringContains(field, filterTerm, true);
};
this.totalCount = processors.length;

View File

@ -15,4 +15,33 @@
~ limitations under the License.
-->
<p>remote-process-group-status-listing works!</p>
<ng-container>
<div *ngIf="isInitialLoading((loadedTimestamp$ | async)!); else loaded">
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</div>
<ng-template #loaded>
<div class="flex flex-col h-full gap-y-2">
<div class="flex-1" *ngIf="currentUser$ | async as user">
<ng-container>
<remote-process-group-status-table
[remoteProcessGroups]="(rpgStatusSnapshots$ | async)!"
[selectedRemoteProcessGroupId]="selectedRpgId$ | async"
(selectRemoteProcessGroup)="selectRemoteProcessGroup($event)"
(viewStatusHistory)="viewStatusHistory($event)"
initialSortColumn="name"
initialSortDirection="asc"></remote-process-group-status-table>
</ng-container>
</div>
<div class="flex justify-between">
<div class="refresh-container flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshSummaryListing()">
<i class="fa fa-refresh" [class.fa-spin]="(summaryListingStatus$ | async) === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div>
</ng-template>
</ng-container>

View File

@ -18,6 +18,11 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { RemoteProcessGroupStatusListing } from './remote-process-group-status-listing.component';
import { RemoteProcessGroupStatusTable } from './remote-process-group-status-table/remote-process-group-status-table.component';
import { SummaryTableFilterModule } from '../common/summary-table-filter/summary-table-filter.module';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
describe('RemoteProcessGroupStatusListing', () => {
let component: RemoteProcessGroupStatusListing;
@ -25,7 +30,9 @@ describe('RemoteProcessGroupStatusListing', () => {
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [RemoteProcessGroupStatusListing]
declarations: [RemoteProcessGroupStatusListing],
imports: [RemoteProcessGroupStatusTable, SummaryTableFilterModule, NoopAnimationsModule],
providers: [provideMockStore({ initialState })]
});
fixture = TestBed.createComponent(RemoteProcessGroupStatusListing);
component = fixture.componentInstance;

View File

@ -16,10 +16,87 @@
*/
import { Component } from '@angular/core';
import {
selectRemoteProcessGroupIdFromRoute,
selectRemoteProcessGroupStatus,
selectRemoteProcessGroupStatusSnapshots,
selectSummaryListingLoadedTimestamp,
selectSummaryListingStatus,
selectViewStatusHistory
} from '../../state/summary-listing/summary-listing.selectors';
import { selectUser } from '../../../../state/user/user.selectors';
import { Store } from '@ngrx/store';
import { RemoteProcessGroupStatusSnapshotEntity, SummaryListingState } from '../../state/summary-listing';
import { filter, switchMap, take } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { openStatusHistoryDialog } from '../../../../state/status-history/status-history.actions';
import { ComponentType } from '../../../../state/shared';
import { initialState } from '../../state/summary-listing/summary-listing.reducer';
import * as SummaryListingActions from '../../state/summary-listing/summary-listing.actions';
@Component({
selector: 'remote-process-group-status-listing',
templateUrl: './remote-process-group-status-listing.component.html',
styleUrls: ['./remote-process-group-status-listing.component.scss']
})
export class RemoteProcessGroupStatusListing {}
export class RemoteProcessGroupStatusListing {
loadedTimestamp$ = this.store.select(selectSummaryListingLoadedTimestamp);
summaryListingStatus$ = this.store.select(selectSummaryListingStatus);
currentUser$ = this.store.select(selectUser);
rpgStatusSnapshots$ = this.store.select(selectRemoteProcessGroupStatusSnapshots);
selectedRpgId$ = this.store.select(selectRemoteProcessGroupIdFromRoute);
constructor(private store: Store<SummaryListingState>) {
this.store
.select(selectViewStatusHistory)
.pipe(
filter((id: string) => !!id),
switchMap((id: string) =>
this.store.select(selectRemoteProcessGroupStatus(id)).pipe(
filter((connection) => !!connection),
take(1)
)
),
takeUntilDestroyed()
)
.subscribe((rpg) => {
if (rpg) {
this.store.dispatch(
openStatusHistoryDialog({
request: {
source: 'summary',
componentType: ComponentType.RemoteProcessGroup,
componentId: rpg.id
}
})
);
}
});
}
isInitialLoading(loadedTimestamp: string): boolean {
return loadedTimestamp == initialState.loadedTimestamp;
}
refreshSummaryListing() {
this.store.dispatch(SummaryListingActions.loadSummaryListing({ recursive: true }));
}
selectRemoteProcessGroup(rpg: RemoteProcessGroupStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.selectRemoteProcessGroupStatus({
request: {
id: rpg.id
}
})
);
}
viewStatusHistory(rpg: RemoteProcessGroupStatusSnapshotEntity): void {
this.store.dispatch(
SummaryListingActions.navigateToViewRemoteProcessGroupStatusHistory({
id: rpg.id
})
);
}
}

View File

@ -18,10 +18,12 @@
import { NgModule } from '@angular/core';
import { RemoteProcessGroupStatusListing } from './remote-process-group-status-listing.component';
import { CommonModule } from '@angular/common';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { RemoteProcessGroupStatusTable } from './remote-process-group-status-table/remote-process-group-status-table.component';
@NgModule({
declarations: [RemoteProcessGroupStatusListing],
exports: [RemoteProcessGroupStatusListing],
imports: [CommonModule]
imports: [CommonModule, NgxSkeletonLoaderModule, RemoteProcessGroupStatusTable]
})
export class RemoteProcessGroupStatusListingModule {}

View File

@ -0,0 +1,158 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="remote-process-group-status-table h-full flex flex-col">
<!-- allow filtering of the table -->
<summary-table-filter
[filteredCount]="filteredCount"
[totalCount]="totalCount"
[filterableColumns]="filterableColumns"
[includeStatusFilter]="false"
[includePrimaryNodeOnlyFilter]="false"
(filterChanged)="applyFilter($event)"></summary-table-filter>
<div class="flex-1 relative">
<div class="listing-table overflow-y-auto border absolute inset-0">
<table
mat-table
[dataSource]="dataSource"
matSort
matSortDisableClear
(matSortChange)="sortData($event)"
[matSortActive]="initialSortColumn"
[matSortDirection]="initialSortDirection">
<!-- More Details Column -->
<ng-container matColumnDef="moreDetails">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item"></td>
</ng-container>
<!-- Name Column -->
<ng-container matColumnDef="name">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Name</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatName(item)">
{{ formatName(item) }}
</td>
</ng-container>
<!-- Target URI Column -->
<ng-container matColumnDef="uri">
<th mat-header-cell *matHeaderCellDef mat-sort-header>
<div class="flex-1 overflow-ellipsis overflow-hidden whitespace-nowrap">Target URI</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatUri(item)">
{{ formatUri(item) }}
</td>
</ng-container>
<!-- Transmission Status column -->
<ng-container matColumnDef="transmitting">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Transmitting</th>
<td mat-cell *matCellDef="let item">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1.5">
<span [ngClass]="getTransmissionStatusIcon(item)"></span>
<span [title]="formatTransmitting(item)">{{ formatTransmitting(item) }}</span>
<span *ngIf="item.activeThreadCount > 0" title="Active Threads"
>({{ item.activeThreadCount }})</span
>
</div>
</td>
</ng-container>
<!-- Sent column -->
<ng-container matColumnDef="sent">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{ underline: multiSort.active === 'sent' && multiSort.sortValueIndex === 0 }"
>Sent</span
>
<span
[ngClass]="{ underline: multiSort.active === 'sent' && multiSort.sortValueIndex === 1 }"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatSent(item)">
{{ formatSent(item) }}
</td>
</ng-container>
<!-- Received column -->
<ng-container matColumnDef="received">
<th
mat-header-cell
*matHeaderCellDef
mat-sort-header
title="Count / data size in the last 5 minutes">
<div class="inline-block overflow-hidden overflow-ellipsis whitespace-nowrap space-x-1">
<span
[ngClass]="{
underline: multiSort.active === 'received' && multiSort.sortValueIndex === 0
}"
>Received</span
>
<span
[ngClass]="{
underline: multiSort.active === 'received' && multiSort.sortValueIndex === 1
}"
>(Size)</span
>
<span class="font-light">5 min</span>
</div>
</th>
<td mat-cell *matCellDef="let item" [title]="formatReceived(item)">
{{ formatReceived(item) }}
</td>
</ng-container>
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-3">
<div
class="pointer fa fa-long-arrow-right"
[routerLink]="getRemoteProcessGroupLink(item)"
(click)="$event.stopPropagation()"
title="Go to remote process group"></div>
<div
class="pointer fa fa-area-chart"
title="View Status History"
(click)="viewStatusHistoryClicked($event, item)"></div>
</div>
</td>
</ng-container>
<tr mat-header-row *matHeaderRowDef="displayedColumns; sticky: true"></tr>
<tr
mat-row
*matRowDef="let row; let even = even; columns: displayedColumns"
[class.even]="even"
(click)="select(row)"
[class.selected]="isSelected(row)"></tr>
</table>
</div>
</div>
</div>

View File

@ -0,0 +1,30 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.remote-process-group-status-table {
.listing-table {
.mat-column-moreDetails {
width: 32px;
min-width: 32px;
}
.mat-column-actions {
width: 72px;
min-width: 72px;
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { RemoteProcessGroupStatusTable } from './remote-process-group-status-table.component';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import { MatSortModule } from '@angular/material/sort';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
describe('RemoteProcessGroupStatusTableComponent', () => {
let component: RemoteProcessGroupStatusTable;
let fixture: ComponentFixture<RemoteProcessGroupStatusTable>;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [RemoteProcessGroupStatusTable, SummaryTableFilterModule, MatSortModule, NoopAnimationsModule]
});
fixture = TestBed.createComponent(RemoteProcessGroupStatusTable);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, EventEmitter, Input, Output } from '@angular/core';
import { CommonModule } from '@angular/common';
import { SummaryTableFilterModule } from '../../common/summary-table-filter/summary-table-filter.module';
import { MatSortModule, Sort, SortDirection } from '@angular/material/sort';
import {
SummaryTableFilterArgs,
SummaryTableFilterColumn
} from '../../common/summary-table-filter/summary-table-filter.component';
import { MultiSort } from '../../common';
import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import {
PortStatusSnapshot,
RemoteProcessGroupStatusSnapshot,
RemoteProcessGroupStatusSnapshotEntity
} from '../../../state/summary-listing';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
import { ComponentType } from '../../../../../state/shared';
import { RouterLink } from '@angular/router';
export type SupportedColumns = 'name' | 'uri' | 'transmitting' | 'sent' | 'received';
@Component({
selector: 'remote-process-group-status-table',
standalone: true,
imports: [CommonModule, SummaryTableFilterModule, MatSortModule, MatTableModule, RouterLink],
templateUrl: './remote-process-group-status-table.component.html',
styleUrls: [
'./remote-process-group-status-table.component.scss',
'../../../../../../assets/styles/listing-table.scss'
]
})
export class RemoteProcessGroupStatusTable {
private _initialSortColumn: SupportedColumns = 'name';
private _initialSortDirection: SortDirection = 'asc';
filterableColumns: SummaryTableFilterColumn[] = [
{ key: 'name', label: 'name' },
{ key: 'targetUri', label: 'uri' }
];
totalCount: number = 0;
filteredCount: number = 0;
multiSort: MultiSort = {
active: this._initialSortColumn,
direction: this._initialSortDirection,
sortValueIndex: 0,
totalValues: 2
};
displayedColumns: string[] = ['moreDetails', 'name', 'uri', 'transmitting', 'sent', 'received', 'actions'];
dataSource: MatTableDataSource<RemoteProcessGroupStatusSnapshotEntity> =
new MatTableDataSource<RemoteProcessGroupStatusSnapshotEntity>();
constructor(private nifiCommon: NiFiCommon) {}
@Input() set initialSortColumn(initialSortColumn: SupportedColumns) {
this._initialSortColumn = initialSortColumn;
this.multiSort = { ...this.multiSort, active: initialSortColumn };
}
get initialSortColumn() {
return this._initialSortColumn;
}
@Input() set initialSortDirection(initialSortDirection: SortDirection) {
this._initialSortDirection = initialSortDirection;
this.multiSort = { ...this.multiSort, direction: initialSortDirection };
}
get initialSortDirection() {
return this._initialSortDirection;
}
@Input() selectedRemoteProcessGroupId!: string;
@Input() set remoteProcessGroups(rpgs: RemoteProcessGroupStatusSnapshotEntity[]) {
if (rpgs) {
this.dataSource.data = this.sortEntities(rpgs, this.multiSort);
this.dataSource.filterPredicate = (data: RemoteProcessGroupStatusSnapshotEntity, filter: string) => {
const { filterTerm, filterColumn } = JSON.parse(filter);
if (filterTerm === '') {
return true;
}
const field: string = data.remoteProcessGroupStatusSnapshot[
filterColumn as keyof RemoteProcessGroupStatusSnapshot
] as string;
return this.nifiCommon.stringContains(field, filterTerm, true);
};
this.totalCount = rpgs.length;
this.filteredCount = rpgs.length;
}
}
@Output() viewStatusHistory: EventEmitter<RemoteProcessGroupStatusSnapshotEntity> =
new EventEmitter<RemoteProcessGroupStatusSnapshotEntity>();
@Output() selectRemoteProcessGroup: EventEmitter<RemoteProcessGroupStatusSnapshotEntity> =
new EventEmitter<RemoteProcessGroupStatusSnapshotEntity>();
applyFilter(filter: SummaryTableFilterArgs) {
this.dataSource.filter = JSON.stringify(filter);
this.filteredCount = this.dataSource.filteredData.length;
}
getRemoteProcessGroupLink(rpg: RemoteProcessGroupStatusSnapshotEntity): string[] {
return [
'/process-groups',
rpg.remoteProcessGroupStatusSnapshot.groupId,
ComponentType.RemoteProcessGroup,
rpg.id
];
}
select(rpg: RemoteProcessGroupStatusSnapshotEntity) {
this.selectRemoteProcessGroup.next(rpg);
}
isSelected(rpg: RemoteProcessGroupStatusSnapshotEntity): boolean {
if (this.selectedRemoteProcessGroupId) {
return rpg.id === this.selectedRemoteProcessGroupId;
}
return false;
}
canRead(rpg: RemoteProcessGroupStatusSnapshotEntity): boolean {
return rpg.canRead;
}
sortData(sort: Sort) {
this.setMultiSort(sort);
this.dataSource.data = this.sortEntities(this.dataSource.data, sort);
}
viewStatusHistoryClicked(event: MouseEvent, rpg: RemoteProcessGroupStatusSnapshotEntity): void {
event.stopPropagation();
this.viewStatusHistory.next(rpg);
}
formatName(rpg: RemoteProcessGroupStatusSnapshotEntity): string {
return rpg.remoteProcessGroupStatusSnapshot.name;
}
formatTransmitting(rpg: RemoteProcessGroupStatusSnapshotEntity): string {
if (rpg.remoteProcessGroupStatusSnapshot.transmissionStatus === 'Transmitting') {
return rpg.remoteProcessGroupStatusSnapshot.transmissionStatus;
} else {
return 'Not Transmitting';
}
}
formatUri(rpg: RemoteProcessGroupStatusSnapshotEntity): string {
return rpg.remoteProcessGroupStatusSnapshot.targetUri;
}
formatSent(rpg: RemoteProcessGroupStatusSnapshotEntity): string {
return rpg.remoteProcessGroupStatusSnapshot.sent;
}
formatReceived(rpg: RemoteProcessGroupStatusSnapshotEntity): string {
return rpg.remoteProcessGroupStatusSnapshot.received;
}
getTransmissionStatusIcon(rpg: RemoteProcessGroupStatusSnapshotEntity): string {
if (rpg.remoteProcessGroupStatusSnapshot.transmissionStatus === 'Transmitting') {
return 'transmitting fa fa-bullseye';
} else {
return 'not-transmitting icon icon-transmit-false';
}
}
private supportsMultiValuedSort(sort: Sort): boolean {
switch (sort.active) {
case 'sent':
case 'received':
return true;
default:
return false;
}
}
private setMultiSort(sort: Sort) {
const { active, direction, sortValueIndex, totalValues } = this.multiSort;
if (this.supportsMultiValuedSort(sort)) {
if (active === sort.active) {
// previous sort was of the same column
if (direction === 'desc' && sort.direction === 'asc') {
// change from previous index to the next
const newIndex = sortValueIndex + 1 >= totalValues ? 0 : sortValueIndex + 1;
this.multiSort = { ...sort, sortValueIndex: newIndex, totalValues };
} else {
this.multiSort = { ...sort, sortValueIndex, totalValues };
}
} else {
// sorting a different column, just reset
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
} else {
this.multiSort = { ...sort, sortValueIndex: 0, totalValues };
}
}
private sortEntities(
data: RemoteProcessGroupStatusSnapshotEntity[],
sort: Sort
): RemoteProcessGroupStatusSnapshotEntity[] {
if (!data) {
return [];
}
return data.slice().sort((a, b) => {
const isAsc: boolean = sort.direction === 'asc';
let retVal: number = 0;
switch (sort.active) {
case 'name':
retVal = this.nifiCommon.compareString(
a.remoteProcessGroupStatusSnapshot.name,
b.remoteProcessGroupStatusSnapshot.name
);
break;
case 'transmitting':
retVal = this.nifiCommon.compareString(
a.remoteProcessGroupStatusSnapshot.transmissionStatus,
b.remoteProcessGroupStatusSnapshot.transmissionStatus
);
break;
case 'uri':
retVal = this.nifiCommon.compareString(
a.remoteProcessGroupStatusSnapshot.targetUri,
b.remoteProcessGroupStatusSnapshot.targetUri
);
break;
case 'sent':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.remoteProcessGroupStatusSnapshot.flowFilesSent,
b.remoteProcessGroupStatusSnapshot.flowFilesSent
);
} else {
retVal = this.nifiCommon.compareNumber(
a.remoteProcessGroupStatusSnapshot.bytesSent,
b.remoteProcessGroupStatusSnapshot.bytesSent
);
}
break;
case 'received':
if (this.multiSort.sortValueIndex === 0) {
retVal = this.nifiCommon.compareNumber(
a.remoteProcessGroupStatusSnapshot.flowFilesReceived,
b.remoteProcessGroupStatusSnapshot.flowFilesReceived
);
} else {
retVal = this.nifiCommon.compareNumber(
a.remoteProcessGroupStatusSnapshot.bytesReceived,
b.remoteProcessGroupStatusSnapshot.bytesReceived
);
}
break;
default:
retVal = 0;
}
return retVal * (isAsc ? 1 : -1);
});
}
}

View File

@ -136,6 +136,32 @@ export class NiFiCommon {
return Array.isArray(arr) ? arr.length === 0 : true;
}
/**
* Determines if a string contains another, optionally looking case insensitively.
*
* @param stringToSearch
* @param stringToFind
* @param caseInsensitive
*/
public stringContains(
stringToSearch: string | null | undefined,
stringToFind: string | null | undefined,
caseInsensitive: boolean = false
): boolean {
if (this.isBlank(stringToSearch)) {
return false;
}
if (this.isBlank(stringToFind)) {
return true;
}
if (caseInsensitive) {
// @ts-ignore
return stringToSearch.toLowerCase().indexOf(stringToFind.toLowerCase()) >= 0;
}
// @ts-ignore
return stringToSearch.indexOf(stringToFind) >= 0;
}
/**
* Formats the class name of this component.
*

View File

@ -18,6 +18,7 @@
.listing-table {
table {
width: 100%;
table-layout: fixed;
td,
th {
@ -74,3 +75,7 @@
::ng-deep .mat-sort-header-arrow {
color: #fff;
}
::ng-deep .mat-sort-header-content {
overflow: hidden;
}