NIFI-12807: Handle clustering in Provenance, Lineage, and Queue Listing (#8431)

* NIFI-12807:
- Handling cluster node id in provenance listing, lineage graph, and queue listing.

* NIFI-12807:
- Addressing review feedback.

This closes #8431
This commit is contained in:
Matt Gilman 2024-02-21 11:20:49 -05:00 committed by GitHub
parent 0a2ba317c0
commit 6c76ecadd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 686 additions and 148 deletions

View File

@ -64,7 +64,8 @@
"buildTarget": "nifi:build:production"
},
"development": {
"buildTarget": "nifi:build:development"
"buildTarget": "nifi:build:development",
"servePath": "/nifi"
}
},
"defaultConfiguration": "development"

View File

@ -9,9 +9,5 @@ const target = {
};
export default {
'/nifi-api/*': target,
'/nifi-docs/*': target,
'/nifi-content-viewer/*': target,
// the following entry is needed because the content viewer (and other UIs) load resources from existing nifi ui
'/nifi/*': target
'/': target
};

View File

@ -46,6 +46,7 @@ import { ErrorEffects } from './state/error/error.effects';
import { MatSnackBarModule } from '@angular/material/snack-bar';
import { PipesModule } from './pipes/pipes.module';
import { DocumentationEffects } from './state/documentation/documentation.effects';
import { ClusterSummaryEffects } from './state/cluster-summary/cluster-summary.effects';
@NgModule({
declarations: [AppComponent],
@ -73,7 +74,8 @@ import { DocumentationEffects } from './state/documentation/documentation.effect
ControllerServiceStateEffects,
SystemDiagnosticsEffects,
ComponentStateEffects,
DocumentationEffects
DocumentationEffects,
ClusterSummaryEffects
),
StoreDevtoolsModule.instrument({
maxAge: 25,

View File

@ -68,10 +68,6 @@ export class FlowService implements PropertyDescriptorRetriever {
return this.httpClient.get(`${FlowService.API}/flow/status`);
}
getClusterSummary(): Observable<any> {
return this.httpClient.get(`${FlowService.API}/flow/cluster/summary`);
}
getControllerBulletins(): Observable<any> {
return this.httpClient.get(`${FlowService.API}/flow/controller/bulletins`);
}

View File

@ -98,7 +98,6 @@ import { ImportFromRegistry } from '../../ui/canvas/items/flow/import-from-regis
import { selectCurrentUser } from '../../../../state/current-user/current-user.selectors';
import { NoRegistryClientsDialog } from '../../ui/common/no-registry-clients-dialog/no-registry-clients-dialog.component';
import { EditRemoteProcessGroup } from '../../ui/canvas/items/remote-process-group/edit-remote-process-group/edit-remote-process-group.component';
import { ErrorHelper } from '../../../../service/error-helper.service';
@Injectable()
export class FlowEffects {
@ -144,16 +143,14 @@ export class FlowEffects {
combineLatest([
this.flowService.getFlow(request.id),
this.flowService.getFlowStatus(),
this.flowService.getClusterSummary(),
this.flowService.getControllerBulletins()
]).pipe(
map(([flow, flowStatus, clusterSummary, controllerBulletins]) => {
map(([flow, flowStatus, controllerBulletins]) => {
return FlowActions.loadProcessGroupSuccess({
response: {
id: request.id,
flow: flow,
flowStatus: flowStatus,
clusterSummary: clusterSummary.clusterSummary,
controllerBulletins: controllerBulletins
}
});

View File

@ -123,13 +123,6 @@ export const initialState: FlowState = {
syncFailureCount: undefined
}
},
clusterSummary: {
clustered: false,
connectedToCluster: false,
connectedNodes: '',
connectedNodeCount: 0,
totalNodeCount: 0
},
refreshRpgDetails: null,
controllerBulletins: {
bulletins: [],
@ -182,7 +175,6 @@ export const flowReducer = createReducer(
id: response.flow.processGroupFlow.id,
flow: response.flow,
flowStatus: response.flowStatus,
clusterSummary: response.clusterSummary,
controllerBulletins: response.controllerBulletins,
error: null,
status: 'success' as const

View File

@ -228,8 +228,6 @@ export const selectLastRefreshed = createSelector(
(state: FlowState) => state.flow.processGroupFlow.lastRefreshed
);
export const selectClusterSummary = createSelector(selectFlowState, (state: FlowState) => state.clusterSummary);
export const selectControllerBulletins = createSelector(
selectFlowState,
(state: FlowState) => state.controllerBulletins.bulletins // TODO - include others?

View File

@ -62,7 +62,6 @@ export interface LoadProcessGroupResponse {
id: string;
flow: ProcessGroupFlowEntity;
flowStatus: ControllerStatusEntity;
clusterSummary: ClusterSummary;
controllerBulletins: ControllerBulletinsEntity;
}
@ -493,14 +492,6 @@ export interface ControllerStatusEntity {
controllerStatus: ControllerStatus;
}
export interface ClusterSummary {
clustered: boolean;
connectedToCluster: boolean;
connectedNodes?: string;
connectedNodeCount: number;
totalNodeCount: number;
}
export interface ControllerBulletinsEntity {
bulletins: BulletinEntity[];
controllerServiceBulletins: BulletinEntity[];
@ -514,7 +505,6 @@ export interface FlowState {
flow: ProcessGroupFlowEntity;
flowStatus: ControllerStatusEntity;
refreshRpgDetails: RefreshRemoteProcessGroupPollingDetailsRequest | null;
clusterSummary: ClusterSummary;
controllerBulletins: ControllerBulletinsEntity;
dragging: boolean;
transitionRequired: boolean;

View File

@ -67,6 +67,11 @@ import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow
import { concatLatestFrom } from '@ngrx/effects';
import { selectUrl } from '../../../../state/router/router.selectors';
import { Storage } from '../../../../service/storage.service';
import {
loadClusterSummary,
startClusterSummaryPolling,
stopClusterSummaryPolling
} from '../../../../state/cluster-summary/cluster-summary.actions';
@Component({
selector: 'fd-canvas',
@ -285,7 +290,9 @@ export class Canvas implements OnInit, OnDestroy {
this.canvasView.init(this.viewContainerRef, this.svg, this.canvas);
this.store.dispatch(loadFlowConfiguration());
this.store.dispatch(loadClusterSummary());
this.store.dispatch(startProcessGroupPolling());
this.store.dispatch(startClusterSummaryPolling());
}
private createSvg(): void {
@ -595,5 +602,6 @@ export class Canvas implements OnInit, OnDestroy {
ngOnDestroy(): void {
this.store.dispatch(resetFlowState());
this.store.dispatch(stopProcessGroupPolling());
this.store.dispatch(stopClusterSummaryPolling());
}
}

View File

@ -43,6 +43,10 @@
color: $primary-palette-500;
}
.warning {
color: $warn-palette-400;
}
.status-value {
color: $warn-palette-A400;
}

View File

@ -18,7 +18,7 @@
<div class="h-8 flow-status">
<div class="flex justify-between">
<div class="flex flex-1 justify-around pr-20">
@if (clusterSummary.clustered) {
@if (clusterSummary?.clustered) {
<div class="flex items-center gap-x-2" title="Connected nodes / Total number of nodes in the cluster">
<div class="fa fa-cubes" [class]="getClusterStyle()"></div>
<div class="text">{{ formatClusterMessage() }}</div>

View File

@ -16,13 +16,14 @@
*/
import { Component, Input } from '@angular/core';
import { ClusterSummary, ControllerStatus } from '../../../../state/flow';
import { ControllerStatus } from '../../../../state/flow';
import { initialState } from '../../../../state/flow/flow.reducer';
import { BulletinsTip } from '../../../../../../ui/common/tooltips/bulletins-tip/bulletins-tip.component';
import { BulletinEntity, BulletinsTipInput } from '../../../../../../state/shared';
import { Search } from '../search/search.component';
import { NifiTooltipDirective } from '../../../../../../ui/common/tooltips/nifi-tooltip.directive';
import { ClusterSummary } from '../../../../../../state/cluster-summary';
@Component({
selector: 'flow-status',
@ -34,7 +35,7 @@ import { NifiTooltipDirective } from '../../../../../../ui/common/tooltips/nifi-
export class FlowStatus {
@Input() controllerStatus: ControllerStatus = initialState.flowStatus.controllerStatus;
@Input() lastRefreshed: string = initialState.flow.processGroupFlow.lastRefreshed;
@Input() clusterSummary: ClusterSummary = initialState.clusterSummary;
@Input() clusterSummary: ClusterSummary | null = null;
@Input() bulletins: BulletinEntity[] = initialState.controllerBulletins.bulletins;
@Input() currentProcessGroupId: string = initialState.id;
@Input() loadingStatus = false;
@ -46,7 +47,7 @@ export class FlowStatus {
}
formatClusterMessage(): string {
if (this.clusterSummary.connectedToCluster && this.clusterSummary.connectedNodes) {
if (this.clusterSummary?.connectedToCluster && this.clusterSummary.connectedNodes) {
return this.clusterSummary.connectedNodes;
} else {
return 'Disconnected';
@ -55,8 +56,8 @@ export class FlowStatus {
getClusterStyle(): string {
if (
!this.clusterSummary.connectedToCluster ||
this.clusterSummary.connectedNodeCount != this.clusterSummary.totalNodeCount
this.clusterSummary?.connectedToCluster === false ||
this.clusterSummary?.connectedNodeCount != this.clusterSummary?.totalNodeCount
) {
return 'warning';
}

View File

@ -24,16 +24,14 @@ import { HttpClientTestingModule } from '@angular/common/http/testing';
import { NewCanvasItem } from './new-canvas-item/new-canvas-item.component';
import { MatMenuModule } from '@angular/material/menu';
import { MatDividerModule } from '@angular/material/divider';
import {
selectClusterSummary,
selectControllerBulletins,
selectControllerStatus
} from '../../../state/flow/flow.selectors';
import { ClusterSummary, ControllerStatus } from '../../../state/flow';
import { selectControllerBulletins, selectControllerStatus } from '../../../state/flow/flow.selectors';
import { ControllerStatus } from '../../../state/flow';
import { CdkConnectedOverlay, CdkOverlayOrigin } from '@angular/cdk/overlay';
import { FormsModule, ReactiveFormsModule } from '@angular/forms';
import { Component } from '@angular/core';
import { RouterTestingModule } from '@angular/router/testing';
import { ClusterSummary } from '../../../../../state/cluster-summary';
import { selectClusterSummary } from '../../../../../state/cluster-summary/cluster-summary.selectors';
describe('HeaderComponent', () => {
let component: HeaderComponent;

View File

@ -21,7 +21,6 @@ import { Store } from '@ngrx/store';
import { CanvasState } from '../../../state';
import {
selectCanvasPermissions,
selectClusterSummary,
selectControllerBulletins,
selectControllerStatus,
selectCurrentProcessGroupId,
@ -36,6 +35,7 @@ import { MatDividerModule } from '@angular/material/divider';
import { RouterLink } from '@angular/router';
import { FlowStatus } from './flow-status/flow-status.component';
import { Navigation } from '../../../../../ui/common/navigation/navigation.component';
import { selectClusterSummary } from '../../../../../state/cluster-summary/cluster-summary.selectors';
@Component({
selector: 'fd-header',

View File

@ -22,7 +22,6 @@ import { NiFiCommon } from '../../../../../service/nifi-common.service';
import { ParameterContextEntity } from '../../../state/parameter-context-listing';
import { FlowConfiguration } from '../../../../../state/flow-configuration';
import { CurrentUser } from '../../../../../state/current-user';
import { ParameterProviderConfigurationEntity } from '../../../../../state/shared';
@Component({
selector: 'parameter-context-table',

View File

@ -17,7 +17,7 @@
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
import { HttpClient, HttpParams } from '@angular/common/http';
import { ProvenanceRequest } from '../state/provenance-event-listing';
import { LineageRequest } from '../state/lineage';
@ -36,28 +36,44 @@ export class ProvenanceService {
}
getProvenanceQuery(id: string, clusterNodeId?: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.get(`${ProvenanceService.API}/provenance/${encodeURIComponent(id)}`);
let params = new HttpParams().set('summarize', true).set('incrementalResults', false);
if (clusterNodeId) {
params = params.set('clusterNodeId', clusterNodeId);
}
return this.httpClient.get(`${ProvenanceService.API}/provenance/${encodeURIComponent(id)}`, { params });
}
deleteProvenanceQuery(id: string, clusterNodeId?: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.delete(`${ProvenanceService.API}/provenance/${encodeURIComponent(id)}`);
let params = new HttpParams();
if (clusterNodeId) {
params = params.set('clusterNodeId', clusterNodeId);
}
getProvenanceEvent(id: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.get(`${ProvenanceService.API}/provenance-events/${encodeURIComponent(id)}`);
return this.httpClient.delete(`${ProvenanceService.API}/provenance/${encodeURIComponent(id)}`, { params });
}
downloadContent(id: string, direction: string): void {
getProvenanceEvent(eventId: number, clusterNodeId?: string): Observable<any> {
let params = new HttpParams();
if (clusterNodeId) {
params = params.set('clusterNodeId', clusterNodeId);
}
return this.httpClient.get(`${ProvenanceService.API}/provenance-events/${encodeURIComponent(eventId)}`, {
params
});
}
downloadContent(eventId: number, direction: string, clusterNodeId?: string): void {
let dataUri = `${ProvenanceService.API}/provenance-events/${encodeURIComponent(
id
eventId
)}/content/${encodeURIComponent(direction)}`;
const queryParameters: any = {};
// TODO - cluster node id in query parameters
if (clusterNodeId) {
queryParameters['clusterNodeId'] = clusterNodeId;
}
if (Object.keys(queryParameters).length > 0) {
const query: string = new URLSearchParams(queryParameters).toString();
@ -67,13 +83,23 @@ export class ProvenanceService {
window.open(dataUri);
}
viewContent(nifiUrl: string, contentViewerUrl: string, id: string, direction: string): void {
viewContent(
nifiUrl: string,
contentViewerUrl: string,
eventId: number,
direction: string,
clusterNodeId?: string
): void {
// build the uri to the data
let dataUri = `${nifiUrl}provenance-events/${encodeURIComponent(id)}/content/${encodeURIComponent(direction)}`;
let dataUri = `${nifiUrl}provenance-events/${encodeURIComponent(eventId)}/content/${encodeURIComponent(
direction
)}`;
const dataUriParameters: any = {};
// TODO - cluster node id in data uri parameters
if (clusterNodeId) {
dataUriParameters['clusterNodeId'] = clusterNodeId;
}
// include parameters if necessary
if (Object.keys(dataUriParameters).length > 0) {
@ -100,12 +126,14 @@ export class ProvenanceService {
window.open(`${contentViewer}${contentViewerQuery}`);
}
replay(eventId: string): Observable<any> {
replay(eventId: number, clusterNodeId?: string): Observable<any> {
const payload: any = {
eventId
};
// TODO - add cluster node id in payload
if (clusterNodeId) {
payload['clusterNodeId'] = clusterNodeId;
}
return this.httpClient.post(`${ProvenanceService.API}/provenance-events/replays`, payload);
}
@ -115,12 +143,22 @@ export class ProvenanceService {
}
getLineageQuery(id: string, clusterNodeId?: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.get(`${ProvenanceService.API}/provenance/lineage/${encodeURIComponent(id)}`);
let params = new HttpParams();
if (clusterNodeId) {
params = params.set('clusterNodeId', clusterNodeId);
}
return this.httpClient.get(`${ProvenanceService.API}/provenance/lineage/${encodeURIComponent(id)}`, { params });
}
deleteLineageQuery(id: string, clusterNodeId?: string): Observable<any> {
// TODO - cluster node id
return this.httpClient.delete(`${ProvenanceService.API}/provenance/lineage/${encodeURIComponent(id)}`);
let params = new HttpParams();
if (clusterNodeId) {
params = params.set('clusterNodeId', clusterNodeId);
}
return this.httpClient.delete(`${ProvenanceService.API}/provenance/lineage/${encodeURIComponent(id)}`, {
params
});
}
}

View File

@ -24,8 +24,7 @@ import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import { ProvenanceService } from '../../service/provenance.service';
import { Lineage } from './index';
import { selectClusterNodeId } from '../provenance-event-listing/provenance-event-listing.selectors';
import { selectActiveLineageId } from './lineage.selectors';
import { selectActiveLineageId, selectClusterNodeIdFromActiveLineage } from './lineage.selectors';
import * as ErrorActions from '../../../../state/error/error.actions';
import { ErrorHelper } from '../../../../service/error-helper.service';
import { HttpErrorResponse } from '@angular/common/http';
@ -105,7 +104,7 @@ export class LineageEffects {
ofType(LineageActions.pollLineageQuery),
concatLatestFrom(() => [
this.store.select(selectActiveLineageId).pipe(isDefinedAndNotNull()),
this.store.select(selectClusterNodeId)
this.store.select(selectClusterNodeIdFromActiveLineage)
]),
switchMap(([, id, clusterNodeId]) =>
from(this.provenanceService.getLineageQuery(id, clusterNodeId)).pipe(
@ -153,7 +152,10 @@ export class LineageEffects {
deleteLineageQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.deleteLineageQuery),
concatLatestFrom(() => [this.store.select(selectActiveLineageId), this.store.select(selectClusterNodeId)]),
concatLatestFrom(() => [
this.store.select(selectActiveLineageId),
this.store.select(selectClusterNodeIdFromActiveLineage)
]),
tap(([, id, clusterNodeId]) => {
if (id) {
this.provenanceService.deleteLineageQuery(id, clusterNodeId).subscribe();

View File

@ -32,3 +32,8 @@ export const selectCompletedLineage = createSelector(
);
export const selectActiveLineageId = createSelector(selectActiveLineage, (state: Lineage | null) => state?.id);
export const selectClusterNodeIdFromActiveLineage = createSelector(
selectActiveLineage,
(state: Lineage | null) => state?.request.clusterNodeId
);

View File

@ -16,6 +16,7 @@
*/
import { ProvenanceEventSummary } from '../../../../state/shared';
import { NodeSearchResult } from '../../../../state/cluster-summary';
export const provenanceEventListingFeatureKey = 'provenanceEventListing';
@ -33,14 +34,15 @@ export interface ProvenanceQueryResponse {
}
export interface ProvenanceEventRequest {
id: string;
eventId: number;
clusterNodeId?: string;
}
export interface GoToProvenanceEventSourceRequest {
eventId?: string;
eventId?: number;
componentId?: string;
groupId?: string;
clusterNodeId?: string;
}
export interface SearchableField {
@ -54,8 +56,13 @@ export interface ProvenanceOptions {
searchableFields: SearchableField[];
}
export interface OpenSearchRequest {
clusterNodes: NodeSearchResult[];
}
export interface ProvenanceSearchDialogRequest {
timeOffset: number;
clusterNodes: NodeSearchResult[];
options: ProvenanceOptions;
currentRequest: ProvenanceRequest;
}

View File

@ -18,6 +18,7 @@
import { createAction, props } from '@ngrx/store';
import {
GoToProvenanceEventSourceRequest,
OpenSearchRequest,
ProvenanceEventRequest,
ProvenanceOptionsResponse,
ProvenanceQueryResponse,
@ -78,7 +79,14 @@ export const goToProvenanceEventSource = createAction(
props<{ request: GoToProvenanceEventSourceRequest }>()
);
export const openSearchDialog = createAction('[Provenance Event Listing] Open Search Dialog');
export const loadClusterNodesAndOpenSearchDialog = createAction(
'[Provenance Event Listing] Load Cluster Nodes And Open Search Dialog'
);
export const openSearchDialog = createAction(
'[Provenance Event Listing] Open Search Dialog',
props<{ request: OpenSearchRequest }>()
);
export const saveProvenanceRequest = createAction(
'[Provenance Event Listing] Save Provenance Request',

View File

@ -26,7 +26,7 @@ import { Router } from '@angular/router';
import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component';
import { ProvenanceService } from '../../service/provenance.service';
import {
selectClusterNodeId,
selectClusterNodeIdFromActiveProvenance,
selectActiveProvenanceId,
selectProvenanceOptions,
selectProvenanceRequest,
@ -41,6 +41,8 @@ import * as ErrorActions from '../../../../state/error/error.actions';
import { ErrorHelper } from '../../../../service/error-helper.service';
import { HttpErrorResponse } from '@angular/common/http';
import { isDefinedAndNotNull } from '../../../../state/shared';
import { selectClusterSummary } from '../../../../state/cluster-summary/cluster-summary.selectors';
import { ClusterService } from '../../../../service/cluster.service';
@Injectable()
export class ProvenanceEventListingEffects {
@ -49,6 +51,7 @@ export class ProvenanceEventListingEffects {
private store: Store<NiFiState>,
private provenanceService: ProvenanceService,
private errorHelper: ErrorHelper,
private clusterService: ClusterService,
private dialog: MatDialog,
private router: Router
) {}
@ -166,7 +169,7 @@ export class ProvenanceEventListingEffects {
ofType(ProvenanceEventListingActions.pollProvenanceQuery),
concatLatestFrom(() => [
this.store.select(selectActiveProvenanceId).pipe(isDefinedAndNotNull()),
this.store.select(selectClusterNodeId)
this.store.select(selectClusterNodeIdFromActiveProvenance)
]),
switchMap(([, id, clusterNodeId]) =>
from(this.provenanceService.getProvenanceQuery(id, clusterNodeId)).pipe(
@ -216,7 +219,7 @@ export class ProvenanceEventListingEffects {
ofType(ProvenanceEventListingActions.deleteProvenanceQuery),
concatLatestFrom(() => [
this.store.select(selectActiveProvenanceId),
this.store.select(selectClusterNodeId)
this.store.select(selectClusterNodeIdFromActiveProvenance)
]),
tap(([, id, clusterNodeId]) => {
this.dialog.closeAll();
@ -229,20 +232,53 @@ export class ProvenanceEventListingEffects {
)
);
loadClusterNodesAndOpenSearchDialog$ = createEffect(() =>
this.actions$.pipe(
ofType(ProvenanceEventListingActions.loadClusterNodesAndOpenSearchDialog),
concatLatestFrom(() => this.store.select(selectClusterSummary).pipe(isDefinedAndNotNull())),
switchMap(([, clusterSummary]) => {
if (clusterSummary.connectedToCluster) {
return from(this.clusterService.searchCluster()).pipe(
map((response) =>
ProvenanceEventListingActions.openSearchDialog({
request: {
clusterNodes: response.nodeResults
}
})
),
catchError((errorResponse: HttpErrorResponse) =>
of(ErrorActions.snackBarError({ error: errorResponse.error }))
)
);
} else {
return of(
ProvenanceEventListingActions.openSearchDialog({
request: {
clusterNodes: []
}
})
);
}
})
)
);
openSearchDialog$ = createEffect(
() =>
this.actions$.pipe(
ofType(ProvenanceEventListingActions.openSearchDialog),
map((action) => action.request),
concatLatestFrom(() => [
this.store.select(selectTimeOffset),
this.store.select(selectProvenanceOptions),
this.store.select(selectProvenanceRequest),
this.store.select(selectAbout).pipe(isDefinedAndNotNull())
]),
tap(([, timeOffset, options, currentRequest, about]) => {
tap(([request, timeOffset, options, currentRequest, about]) => {
const dialogReference = this.dialog.open(ProvenanceSearchDialog, {
data: {
timeOffset,
clusterNodes: request.clusterNodes,
options,
currentRequest
},
@ -283,7 +319,7 @@ export class ProvenanceEventListingEffects {
map((action) => action.request),
concatLatestFrom(() => this.store.select(selectAbout)),
tap(([request, about]) => {
this.provenanceService.getProvenanceEvent(request.id).subscribe({
this.provenanceService.getProvenanceEvent(request.eventId, request.clusterNodeId).subscribe({
next: (response) => {
const dialogReference = this.dialog.open(ProvenanceEventDialog, {
data: {
@ -298,7 +334,11 @@ export class ProvenanceEventListingEffects {
dialogReference.componentInstance.downloadContent
.pipe(takeUntil(dialogReference.afterClosed()))
.subscribe((direction: string) => {
this.provenanceService.downloadContent(request.id, direction);
this.provenanceService.downloadContent(
request.eventId,
direction,
request.clusterNodeId
);
});
if (about) {
@ -308,8 +348,9 @@ export class ProvenanceEventListingEffects {
this.provenanceService.viewContent(
about.uri,
about.contentViewerUrl,
request.id,
direction
request.eventId,
direction,
request.clusterNodeId
);
});
}
@ -319,7 +360,7 @@ export class ProvenanceEventListingEffects {
.subscribe(() => {
dialogReference.close();
this.provenanceService.replay(request.id).subscribe({
this.provenanceService.replay(request.eventId, request.clusterNodeId).subscribe({
next: () => {
this.store.dispatch(
ProvenanceEventListingActions.showOkDialog({
@ -356,7 +397,7 @@ export class ProvenanceEventListingEffects {
map((action) => action.request),
tap((request) => {
if (request.eventId) {
this.provenanceService.getProvenanceEvent(request.eventId).subscribe({
this.provenanceService.getProvenanceEvent(request.eventId, request.clusterNodeId).subscribe({
next: (response) => {
const event: any = response.provenanceEvent;
this.router.navigate(this.getEventComponentLink(event.groupId, event.componentId));

View File

@ -22,7 +22,6 @@ import {
provenanceEventListingFeatureKey,
ProvenanceEventListingState,
ProvenanceQueryParams,
ProvenanceRequest,
ProvenanceResults
} from './index';
import { selectCurrentRoute } from '../../../../state/router/router.selectors';
@ -78,9 +77,9 @@ export const selectCompletedProvenance = createSelector(
export const selectActiveProvenanceId = createSelector(selectActiveProvenance, (state: Provenance | null) => state?.id);
export const selectClusterNodeId = createSelector(
selectProvenanceRequest,
(state: ProvenanceRequest | null) => state?.clusterNodeId
export const selectClusterNodeIdFromActiveProvenance = createSelector(
selectActiveProvenance,
(state: Provenance | null) => state?.request.clusterNodeId
);
export const selectProvenanceResults = createSelector(

View File

@ -23,6 +23,7 @@
[loading]="status === 'loading'"
[loadedTimestamp]="(loadedTimestamp$ | async)!"
[events]="provenance.results.provenanceEvents"
[clusterSummary]="(clusterSummary$ | async)!"
[oldestEventAvailable]="provenance.results.oldestEvent"
[timeOffset]="provenance.results.timeOffset"
[resultsMessage]="getResultsMessage(provenance)"

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
import { Component, OnDestroy } from '@angular/core';
import { Component, OnDestroy, OnInit } from '@angular/core';
import { Store } from '@ngrx/store';
import {
GoToProvenanceEventSourceRequest,
@ -37,8 +37,8 @@ import { filter, map, take, tap } from 'rxjs';
import {
clearProvenanceRequest,
goToProvenanceEventSource,
loadClusterNodesAndOpenSearchDialog,
openProvenanceEventDialog,
openSearchDialog,
resetProvenanceState,
resubmitProvenanceQuery,
saveProvenanceRequest
@ -48,17 +48,20 @@ import { resetLineage, submitLineageQuery } from '../../state/lineage/lineage.ac
import { LineageRequest } from '../../state/lineage';
import { selectCompletedLineage } from '../../state/lineage/lineage.selectors';
import { clearBannerErrors } from '../../../../state/error/error.actions';
import { selectClusterSummary } from '../../../../state/cluster-summary/cluster-summary.selectors';
import { loadClusterSummary } from '../../../../state/cluster-summary/cluster-summary.actions';
@Component({
selector: 'provenance-event-listing',
templateUrl: './provenance-event-listing.component.html',
styleUrls: ['./provenance-event-listing.component.scss']
})
export class ProvenanceEventListing implements OnDestroy {
export class ProvenanceEventListing implements OnInit, OnDestroy {
status$ = this.store.select(selectStatus);
loadedTimestamp$ = this.store.select(selectLoadedTimestamp);
provenance$ = this.store.select(selectCompletedProvenance);
lineage$ = this.store.select(selectCompletedLineage);
clusterSummary$ = this.store.select(selectClusterSummary);
request!: ProvenanceRequest;
stateReset = false;
@ -137,6 +140,10 @@ export class ProvenanceEventListing implements OnDestroy {
});
}
ngOnInit(): void {
this.store.dispatch(loadClusterSummary());
}
getResultsMessage(provenance: Provenance): string {
const request: ProvenanceRequest = provenance.request;
const results: ProvenanceResults = provenance.results;
@ -166,7 +173,7 @@ export class ProvenanceEventListing implements OnDestroy {
}
openSearchCriteria(): void {
this.store.dispatch(openSearchDialog());
this.store.dispatch(loadClusterNodesAndOpenSearchDialog());
}
openEventDialog(request: ProvenanceEventRequest): void {

View File

@ -44,10 +44,12 @@ export class LineageComponent implements OnInit {
@Input() set lineage(lineage: Lineage) {
if (lineage && lineage.finished) {
this.addLineage(lineage.results.nodes, lineage.results.links);
this.clusterNodeId = lineage.request.clusterNodeId;
}
}
@Input() eventId: string | null = null;
@Input() eventId: number | null = null;
@Input() set eventTimestampThreshold(eventTimestampThreshold: number) {
if (this.previousEventTimestampThreshold >= 0) {
@ -136,9 +138,9 @@ export class LineageComponent implements OnInit {
action: (selection: any) => {
const selectionData: any = selection.datum();
// TODO cluster node id
this.openEventDialog.next({
id: selectionData.id
eventId: Number(selectionData.id),
clusterNodeId: this.clusterNodeId
});
}
},
@ -151,7 +153,8 @@ export class LineageComponent implements OnInit {
action: (selection: any) => {
const selectionData: any = selection.datum();
this.goToProvenanceEventSource.next({
eventId: selectionData.id
eventId: Number(selectionData.id),
clusterNodeId: this.clusterNodeId
});
}
},
@ -169,11 +172,10 @@ export class LineageComponent implements OnInit {
action: (selection: any) => {
const selectionData: any = selection.datum();
// TODO - cluster node id
this.submitLineageQuery.next({
lineageRequestType: 'PARENTS',
eventId: selectionData.id
// clusterNodeId: clusterNodeId
eventId: selectionData.id,
clusterNodeId: this.clusterNodeId
});
}
},
@ -191,11 +193,10 @@ export class LineageComponent implements OnInit {
action: (selection: any) => {
const selectionData: any = selection.datum();
// TODO - cluster node id
this.submitLineageQuery.next({
lineageRequestType: 'CHILDREN',
eventId: selectionData.id
// clusterNodeId: clusterNodeId
eventId: selectionData.id,
clusterNodeId: this.clusterNodeId
});
}
},
@ -227,17 +228,17 @@ export class LineageComponent implements OnInit {
private nodeLookup: Map<string, any> = new Map<string, any>();
private linkLookup: Map<string, any> = new Map<string, any>();
private previousEventTimestampThreshold = -1;
private clusterNodeId: string | undefined;
constructor() {
this.allMenus = new Map<string, ContextMenuDefinition>();
this.allMenus.set(this.ROOT_MENU.id, this.ROOT_MENU);
const self: LineageComponent = this;
this.lineageContextmenu = {
getMenu(menuId: string): ContextMenuDefinition | undefined {
return self.allMenus.get(menuId);
getMenu: (menuId: string): ContextMenuDefinition | undefined => {
return this.allMenus.get(menuId);
},
filterMenuItem(menuItem: ContextMenuItemDefinition): boolean {
filterMenuItem: (menuItem: ContextMenuItemDefinition): boolean => {
// include if the condition matches
if (menuItem.condition) {
const selection: any = d3.select('circle.context');
@ -247,7 +248,7 @@ export class LineageComponent implements OnInit {
// include if there is no condition (non conditional item, separator, sub menu, etc)
return true;
},
menuItemClicked(menuItem: ContextMenuItemDefinition) {
menuItemClicked: (menuItem: ContextMenuItemDefinition): void => {
if (menuItem.action) {
const selection: any = d3.select('circle.context');
return menuItem.action(selection);
@ -794,9 +795,9 @@ export class LineageComponent implements OnInit {
})
.on('dblclick', (event: MouseEvent, d: any) => {
// show the event details
// TODO - cluster node id
this.openEventDialog.next({
id: d.id
eventId: Number(d.id),
clusterNodeId: this.clusterNodeId
});
});
@ -817,7 +818,7 @@ export class LineageComponent implements OnInit {
.append('circle')
.attr('class', 'event-circle')
.classed('selected', (d: any) => {
return d.id === this.eventId;
return d.id === String(this.eventId);
})
.attr('r', 8)
.attr('stroke-width', 1.0)

View File

@ -130,6 +130,16 @@
</td>
</ng-container>
<!-- Node Column -->
@if (displayedColumns.includes('node')) {
<ng-container matColumnDef="node">
<th mat-header-cell *matHeaderCellDef mat-sort-header>Node</th>
<td mat-cell *matCellDef="let item" [title]="item.clusterNodeAddress">
{{ item.clusterNodeAddress }}
</td>
</ng-container>
}
<!-- Actions Column -->
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>

View File

@ -18,6 +18,11 @@
.provenance-event-table {
.listing-table {
table {
.mat-column-moreDetails {
min-width: 50px;
width: 50px;
}
.mat-column-actions {
min-width: 50px;
width: 50px;

View File

@ -39,6 +39,7 @@ import { GoToProvenanceEventSourceRequest, ProvenanceEventRequest } from '../../
import { MatSliderModule } from '@angular/material/slider';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { ErrorBanner } from '../../../../../ui/common/error-banner/error-banner.component';
import { ClusterSummary } from '../../../../../state/cluster-summary';
@Component({
selector: 'provenance-event-table',
@ -73,8 +74,10 @@ export class ProvenanceEventTable implements AfterViewInit {
return this.nifiCommon.stringContains(data.componentName, filterTerm, true);
} else if (filterColumn === this.filterColumnOptions[1]) {
return this.nifiCommon.stringContains(data.componentType, filterTerm, true);
} else {
} else if (filterColumn === this.filterColumnOptions[2]) {
return this.nifiCommon.stringContains(data.eventType, filterTerm, true);
} else {
return this.nifiCommon.stringContains(data.clusterNodeAddress, filterTerm, true);
}
};
this.totalCount = events.length;
@ -98,6 +101,30 @@ export class ProvenanceEventTable implements AfterViewInit {
@Input() loading!: boolean;
@Input() loadedTimestamp!: string;
@Input() set clusterSummary(clusterSummary: ClusterSummary) {
if (clusterSummary?.connectedToCluster) {
// if we're connected to the cluster add a node column if it's not already present
if (!this.displayedColumns.includes('node')) {
this.displayedColumns.splice(this.displayedColumns.length - 1, 0, 'node');
}
if (!this.filterColumnOptions.includes('node')) {
this.filterColumnOptions.push('node');
}
} else {
// if we're not connected to the cluster remove the node column if it is present
const nodeIndex = this.displayedColumns.indexOf('node');
if (nodeIndex > -1) {
this.displayedColumns.splice(nodeIndex, 1);
}
const filterNodeIndex = this.filterColumnOptions.indexOf('node');
if (filterNodeIndex > -1) {
this.filterColumnOptions.splice(filterNodeIndex, 1);
}
}
}
@Input() set lineage$(lineage$: Observable<Lineage | null>) {
this.provenanceLineage$ = lineage$.pipe(
tap((lineage) => {
@ -156,7 +183,6 @@ export class ProvenanceEventTable implements AfterViewInit {
protected readonly ValidationErrorsTip = ValidationErrorsTip;
private destroyRef: DestroyRef = inject(DestroyRef);
// TODO - conditionally include the cluster column
displayedColumns: string[] = [
'moreDetails',
'eventTime',
@ -168,7 +194,7 @@ export class ProvenanceEventTable implements AfterViewInit {
'actions'
];
dataSource: MatTableDataSource<ProvenanceEventSummary> = new MatTableDataSource<ProvenanceEventSummary>();
selectedEventId: string | null = null;
selectedId: string | null = null;
@ViewChild(MatPaginator) paginator!: MatPaginator;
@ -185,7 +211,7 @@ export class ProvenanceEventTable implements AfterViewInit {
showLineage = false;
provenanceLineage$!: Observable<Lineage | null>;
eventId: string | null = null;
eventId: number | null = null;
minEventTimestamp = -1;
maxEventTimestamp = -1;
@ -253,6 +279,11 @@ export class ProvenanceEventTable implements AfterViewInit {
case 'componentType':
retVal = this.nifiCommon.compareString(a.componentType, b.componentType);
break;
case 'node':
if (a.clusterNodeAddress && b.clusterNodeAddress) {
retVal = this.nifiCommon.compareString(a.clusterNodeAddress, b.clusterNodeAddress);
}
break;
}
return retVal * (isAsc ? 1 : -1);
@ -281,7 +312,7 @@ export class ProvenanceEventTable implements AfterViewInit {
viewDetailsClicked(event: ProvenanceEventSummary) {
this.submitProvenanceEventRequest({
id: event.id,
eventId: event.eventId,
clusterNodeId: event.clusterNodeId
});
}
@ -291,12 +322,12 @@ export class ProvenanceEventTable implements AfterViewInit {
}
select(event: ProvenanceEventSummary): void {
this.selectedEventId = event.id;
this.selectedId = event.id;
}
isSelected(event: ProvenanceEventSummary): boolean {
if (this.selectedEventId) {
return event.id == this.selectedEventId;
if (this.selectedId) {
return event.id == this.selectedId;
}
return false;
}
@ -321,7 +352,7 @@ export class ProvenanceEventTable implements AfterViewInit {
}
showLineageGraph(event: ProvenanceEventSummary): void {
this.eventId = event.id;
this.eventId = event.eventId;
this.showLineage = true;
this.clearBannerErrors.next();

View File

@ -25,7 +25,7 @@
<input matInput formControlName="value" type="text" />
</mat-form-field>
<div class="-mt-6 mb-4">
<mat-checkbox color="primary" formControlName="inverse" name="inverse"> Exclude </mat-checkbox>
<mat-checkbox color="primary" formControlName="inverse" name="inverse"> Exclude</mat-checkbox>
</div>
</div>
}
@ -65,6 +65,16 @@
</mat-form-field>
</div>
</div>
@if (searchLocationOptions.length > 0) {
<mat-form-field>
<mat-label>Search Location</mat-label>
<mat-select formControlName="searchLocation">
@for (option of searchLocationOptions; track option) {
<mat-option [value]="option.value">{{ option.text }}</mat-option>
}
</mat-select>
</mat-form-field>
}
</mat-dialog-content>
<mat-dialog-actions align="end">
<button color="primary" mat-stroked-button mat-dialog-close>Cancel</button>

View File

@ -21,13 +21,15 @@ import { ProvenanceSearchDialog } from './provenance-search-dialog.component';
import { MAT_DIALOG_DATA } from '@angular/material/dialog';
import { NoopAnimationsModule } from '@angular/platform-browser/animations';
import { MatNativeDateModule } from '@angular/material/core';
import { ProvenanceSearchDialogRequest } from '../../../state/provenance-event-listing';
describe('ProvenanceSearchDialog', () => {
let component: ProvenanceSearchDialog;
let fixture: ComponentFixture<ProvenanceSearchDialog>;
const data: any = {
const data: ProvenanceSearchDialogRequest = {
timeOffset: -18000000,
clusterNodes: [],
options: {
searchableFields: [
{

View File

@ -29,6 +29,11 @@ import {
} from '../../../state/provenance-event-listing';
import { MatDatepickerModule } from '@angular/material/datepicker';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
import { SelectOption } from '../../../../../state/shared';
import { TextTip } from '../../../../../ui/common/tooltips/text-tip/text-tip.component';
import { MatOption } from '@angular/material/autocomplete';
import { MatSelect } from '@angular/material/select';
import { NifiTooltipDirective } from '../../../../../ui/common/tooltips/nifi-tooltip.directive';
@Component({
selector: 'provenance-search-dialog',
@ -41,12 +46,16 @@ import { NiFiCommon } from '../../../../../service/nifi-common.service';
MatCheckboxModule,
MatButtonModule,
AsyncPipe,
MatDatepickerModule
MatDatepickerModule,
MatOption,
MatSelect,
NifiTooltipDirective
],
styleUrls: ['./provenance-search-dialog.component.scss']
})
export class ProvenanceSearchDialog {
@Input() timezone!: string;
@Output() submitSearchCriteria: EventEmitter<ProvenanceRequest> = new EventEmitter<ProvenanceRequest>();
public static readonly MAX_RESULTS: number = 1000;
@ -55,6 +64,7 @@ export class ProvenanceSearchDialog {
private static readonly TIME_REGEX = /^([0-1]\d|2[0-3]):([0-5]\d):([0-5]\d)$/;
provenanceOptionsForm: FormGroup;
searchLocationOptions: SelectOption[] = [];
constructor(
@Inject(MAT_DIALOG_DATA) public request: ProvenanceSearchDialogRequest,
@ -137,6 +147,31 @@ export class ProvenanceSearchDialog {
})
);
});
if (request.clusterNodes.length > 0) {
this.searchLocationOptions = [
{
text: 'cluster',
value: null
}
];
const sortedNodes = [...this.request.clusterNodes];
sortedNodes.sort((a, b) => {
return this.nifiCommon.compareString(a.address, b.address);
});
this.searchLocationOptions.push(
...sortedNodes.map((node) => {
return {
text: node.address,
value: node.id
};
})
);
this.provenanceOptionsForm.addControl('searchLocation', new FormControl(null));
}
}
private clearTime(date: Date): void {
@ -220,6 +255,15 @@ export class ProvenanceSearchDialog {
});
provenanceRequest.searchTerms = searchTerms;
if (this.searchLocationOptions.length > 0) {
const searchLocation = this.provenanceOptionsForm.get('searchLocation')?.value;
if (searchLocation) {
provenanceRequest.clusterNodeId = searchLocation;
}
}
this.submitSearchCriteria.next(provenanceRequest);
}
protected readonly TextTip = TextTip;
}

View File

@ -17,9 +17,15 @@
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
import { HttpClient, HttpParams } from '@angular/common/http';
import { NiFiCommon } from '../../../service/nifi-common.service';
import { FlowFileSummary, ListingRequest, SubmitQueueListingRequest } from '../state/queue-listing';
import {
DownloadFlowFileContentRequest,
FlowFileSummary,
ListingRequest,
SubmitQueueListingRequest,
ViewFlowFileContentRequest
} from '../state/queue-listing';
@Injectable({ providedIn: 'root' })
export class QueueService {
@ -35,7 +41,12 @@ export class QueueService {
}
getFlowFile(flowfileSummary: FlowFileSummary): Observable<any> {
return this.httpClient.get(this.nifiCommon.stripProtocol(flowfileSummary.uri));
let params = new HttpParams();
if (flowfileSummary.clusterNodeId) {
params = params.set('clusterNodeId', flowfileSummary.clusterNodeId);
}
return this.httpClient.get(this.nifiCommon.stripProtocol(flowfileSummary.uri), { params });
}
submitQueueListingRequest(queueListingRequest: SubmitQueueListingRequest): Observable<any> {
@ -53,12 +64,14 @@ export class QueueService {
return this.httpClient.delete(this.nifiCommon.stripProtocol(listingRequest.uri));
}
downloadContent(flowfileSummary: FlowFileSummary): void {
let dataUri = `${this.nifiCommon.stripProtocol(flowfileSummary.uri)}/content`;
downloadContent(request: DownloadFlowFileContentRequest): void {
let dataUri = `${this.nifiCommon.stripProtocol(request.uri)}/content`;
const queryParameters: any = {};
// TODO - flowFileSummary.clusterNodeId in query parameters
if (request.clusterNodeId) {
queryParameters['clusterNodeId'] = request.clusterNodeId;
}
if (Object.keys(queryParameters).length > 0) {
const query: string = new URLSearchParams(queryParameters).toString();
@ -68,13 +81,15 @@ export class QueueService {
window.open(dataUri);
}
viewContent(flowfileSummary: FlowFileSummary, contentViewerUrl: string): void {
viewContent(request: ViewFlowFileContentRequest, contentViewerUrl: string): void {
// build the uri to the data
let dataUri = `${this.nifiCommon.stripProtocol(flowfileSummary.uri)}/content`;
let dataUri = `${this.nifiCommon.stripProtocol(request.uri)}/content`;
const dataUriParameters: any = {};
// TODO - flowFileSummary.clusterNodeId in query parameters
if (request.clusterNodeId) {
dataUriParameters['clusterNodeId'] = request.clusterNodeId;
}
// include parameters if necessary
if (Object.keys(dataUriParameters).length > 0) {

View File

@ -89,15 +89,18 @@ export interface ViewFlowFileRequest {
}
export interface DownloadFlowFileContentRequest {
flowfileSummary: FlowFileSummary;
uri: string;
clusterNodeId?: string;
}
export interface ViewFlowFileContentRequest {
flowfileSummary: FlowFileSummary;
uri: string;
clusterNodeId?: string;
}
export interface FlowFileDialogRequest {
flowfile: FlowFile;
clusterNodeId?: string;
}
export interface QueueListingState {

View File

@ -238,7 +238,8 @@ export class QueueListingEffects {
map((response) =>
QueueListingActions.openFlowFileDialog({
request: {
flowfile: response.flowFile
flowfile: response.flowFile,
clusterNodeId: request.flowfileSummary.clusterNodeId
}
})
),
@ -274,7 +275,10 @@ export class QueueListingEffects {
.subscribe(() => {
this.store.dispatch(
QueueListingActions.downloadFlowFileContent({
request: { flowfileSummary: request.flowfile }
request: {
uri: request.flowfile.uri,
clusterNodeId: request.clusterNodeId
}
})
);
});
@ -285,14 +289,19 @@ export class QueueListingEffects {
.subscribe(() => {
this.store.dispatch(
QueueListingActions.viewFlowFileContent({
request: { flowfileSummary: request.flowfile }
request: {
uri: request.flowfile.uri,
clusterNodeId: request.clusterNodeId
}
})
);
});
}
})
),
{ dispatch: false }
{
dispatch: false
}
);
downloadFlowFileContent$ = createEffect(
@ -300,7 +309,7 @@ export class QueueListingEffects {
this.actions$.pipe(
ofType(QueueListingActions.downloadFlowFileContent),
map((action) => action.request),
tap((request) => this.queueService.downloadContent(request.flowfileSummary))
tap((request) => this.queueService.downloadContent(request))
),
{ dispatch: false }
);
@ -312,7 +321,7 @@ export class QueueListingEffects {
map((action) => action.request),
concatLatestFrom(() => this.store.select(selectAbout).pipe(isDefinedAndNotNull())),
tap(([request, about]) => {
this.queueService.viewContent(request.flowfileSummary, about.contentViewerUrl);
this.queueService.viewContent(request, about.contentViewerUrl);
})
),
{ dispatch: false }

View File

@ -108,6 +108,16 @@
</td>
</ng-container>
<!-- Node Column -->
@if (displayedColumns.includes('node')) {
<ng-container matColumnDef="node">
<th mat-header-cell *matHeaderCellDef>Node</th>
<td mat-cell *matCellDef="let item" [title]="item.clusterNodeAddress">
{{ item.clusterNodeAddress }}
</td>
</ng-container>
}
<!-- Actions Column -->
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>

View File

@ -26,6 +26,7 @@ import { RouterLink } from '@angular/router';
import { FlowFileSummary, ListingRequest } from '../../../state/queue-listing';
import { CurrentUser } from '../../../../../state/current-user';
import { ErrorBanner } from '../../../../../ui/common/error-banner/error-banner.component';
import { ClusterSummary } from '../../../../../state/cluster-summary';
@Component({
selector: 'flowfile-table',
@ -49,6 +50,20 @@ export class FlowFileTable {
this.destinationRunning = listingRequest.destinationRunning;
}
}
@Input() set clusterSummary(clusterSummary: ClusterSummary) {
if (clusterSummary?.connectedToCluster) {
// if we're connected to the cluster add a node column if it's not already present
if (!this.displayedColumns.includes('node')) {
this.displayedColumns.splice(this.displayedColumns.length - 1, 0, 'node');
}
} else {
// if we're not connected to the cluster remove the node column if it is present
const nodeIndex = this.displayedColumns.indexOf('node');
if (nodeIndex > -1) {
this.displayedColumns.splice(nodeIndex, 1);
}
}
}
@Input() currentUser!: CurrentUser;
@Input() contentViewerAvailable!: boolean;
@ -61,7 +76,6 @@ export class FlowFileTable {
protected readonly BulletinsTip = BulletinsTip;
protected readonly ValidationErrorsTip = ValidationErrorsTip;
// TODO - conditionally include the cluster column
displayedColumns: string[] = [
'moreDetails',
'position',

View File

@ -24,6 +24,7 @@
[connectionLabel]="(connectionLabel$ | async)!"
[listingRequest]="listingRequest"
[currentUser]="(currentUser$ | async)!"
[clusterSummary]="(clusterSummary$ | async)!"
[contentViewerAvailable]="contentViewerAvailable(about)"
(viewFlowFile)="viewFlowFile($event)"
(downloadContent)="downloadContent($event)"

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
import { Component, OnDestroy } from '@angular/core';
import { Component, OnDestroy, OnInit } from '@angular/core';
import { Store } from '@ngrx/store';
import { distinctUntilChanged, filter } from 'rxjs';
import {
@ -41,19 +41,22 @@ import { selectAbout } from '../../../../state/about/about.selectors';
import { About } from '../../../../state/about';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { clearBannerErrors } from '../../../../state/error/error.actions';
import { selectClusterSummary } from '../../../../state/cluster-summary/cluster-summary.selectors';
import { loadClusterSummary } from '../../../../state/cluster-summary/cluster-summary.actions';
@Component({
selector: 'queue-listing',
templateUrl: './queue-listing.component.html',
styleUrls: ['./queue-listing.component.scss']
})
export class QueueListing implements OnDestroy {
export class QueueListing implements OnInit, OnDestroy {
status$ = this.store.select(selectStatus);
connectionLabel$ = this.store.select(selectConnectionLabel);
loadedTimestamp$ = this.store.select(selectLoadedTimestamp);
listingRequest$ = this.store.select(selectCompletedListingRequest);
currentUser$ = this.store.select(selectCurrentUser);
about$ = this.store.select(selectAbout);
clusterSummary$ = this.store.select(selectClusterSummary);
constructor(private store: Store<NiFiState>) {
this.store
@ -81,6 +84,10 @@ export class QueueListing implements OnDestroy {
});
}
ngOnInit(): void {
this.store.dispatch(loadClusterSummary());
}
refreshClicked(): void {
this.store.dispatch(resubmitQueueListingRequest());
}
@ -94,11 +101,25 @@ export class QueueListing implements OnDestroy {
}
downloadContent(flowfileSummary: FlowFileSummary): void {
this.store.dispatch(downloadFlowFileContent({ request: { flowfileSummary } }));
this.store.dispatch(
downloadFlowFileContent({
request: {
uri: flowfileSummary.uri,
clusterNodeId: flowfileSummary.clusterNodeId
}
})
);
}
viewContent(flowfileSummary: FlowFileSummary): void {
this.store.dispatch(viewFlowFileContent({ request: { flowfileSummary } }));
this.store.dispatch(
viewFlowFileContent({
request: {
uri: flowfileSummary.uri,
clusterNodeId: flowfileSummary.clusterNodeId
}
})
);
}
ngOnDestroy(): void {

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpClient, HttpParams } from '@angular/common/http';
import { ClusterSearchResults } from '../state/cluster-summary';
@Injectable({ providedIn: 'root' })
export class ClusterService {
private static readonly API: string = '../nifi-api';
constructor(private httpClient: HttpClient) {}
getClusterSummary(): Observable<any> {
return this.httpClient.get(`${ClusterService.API}/flow/cluster/summary`);
}
searchCluster(q?: string): Observable<ClusterSearchResults> {
let params = new HttpParams();
if (q) {
params = params.set('q', q);
}
return this.httpClient.get<ClusterSearchResults>(`${ClusterService.API}/flow/cluster/search-results`, {
params
});
}
}

View File

@ -24,4 +24,4 @@ export const loadAboutSuccess = createAction('[About] Load About Success', props
export const aboutApiError = createAction('[About] About Api Error', props<{ error: string }>());
export const clearAboutApiError = createAction('[User] Clear About Api Error');
export const clearAboutApiError = createAction('[About] Clear About Api Error');

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createAction, props } from '@ngrx/store';
import { LoadClusterSummaryResponse } from './index';
const CLUSTER_SUMMARY_STATE_PREFIX = '[Cluster Summary State]';
export const startClusterSummaryPolling = createAction(`${CLUSTER_SUMMARY_STATE_PREFIX} Start Cluster Summary Polling`);
export const stopClusterSummaryPolling = createAction(`${CLUSTER_SUMMARY_STATE_PREFIX} Stop Cluster Summary Polling`);
export const loadClusterSummary = createAction(`${CLUSTER_SUMMARY_STATE_PREFIX} Load Cluster Summary`);
export const loadClusterSummarySuccess = createAction(
`${CLUSTER_SUMMARY_STATE_PREFIX} Load Cluster Summary Success`,
props<{ response: LoadClusterSummaryResponse }>()
);
export const clusterSummaryApiError = createAction(
`${CLUSTER_SUMMARY_STATE_PREFIX} Cluster Summary Api Error`,
props<{ error: string }>()
);
export const clearClusterSummaryApiError = createAction(`${CLUSTER_SUMMARY_STATE_PREFIX} Clear About Api Error`);

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import * as ClusterSummaryActions from './cluster-summary.actions';
import { asyncScheduler, catchError, from, interval, map, of, switchMap, takeUntil } from 'rxjs';
import { ClusterService } from '../../service/cluster.service';
@Injectable()
export class ClusterSummaryEffects {
constructor(
private actions$: Actions,
private clusterService: ClusterService
) {}
loadClusterSummary$ = createEffect(() =>
this.actions$.pipe(
ofType(ClusterSummaryActions.loadClusterSummary),
switchMap(() => {
return from(
this.clusterService.getClusterSummary().pipe(
map((response) =>
ClusterSummaryActions.loadClusterSummarySuccess({
response
})
),
catchError((error) => of(ClusterSummaryActions.clusterSummaryApiError({ error: error.error })))
)
);
})
)
);
startProcessGroupPolling$ = createEffect(() =>
this.actions$.pipe(
ofType(ClusterSummaryActions.startClusterSummaryPolling),
switchMap(() =>
interval(30000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(ClusterSummaryActions.stopClusterSummaryPolling)))
)
),
switchMap(() => of(ClusterSummaryActions.loadClusterSummary()))
)
);
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createReducer, on } from '@ngrx/store';
import { ClusterSummaryState } from './index';
import {
clusterSummaryApiError,
clearClusterSummaryApiError,
loadClusterSummary,
loadClusterSummarySuccess
} from './cluster-summary.actions';
export const initialState: ClusterSummaryState = {
clusterSummary: null,
error: null,
status: 'pending'
};
export const clusterSummaryReducer = createReducer(
initialState,
on(loadClusterSummary, (state) => ({
...state,
status: 'loading' as const
})),
on(loadClusterSummarySuccess, (state, { response }) => ({
...state,
clusterSummary: response.clusterSummary,
error: null,
status: 'success' as const
})),
on(clusterSummaryApiError, (state, { error }) => ({
...state,
error,
status: 'error' as const
})),
on(clearClusterSummaryApiError, (state) => ({
...state,
error: null,
status: 'pending' as const
}))
);

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createFeatureSelector, createSelector } from '@ngrx/store';
import { clusterSummaryFeatureKey, ClusterSummaryState } from './index';
export const selectClusterSummaryState = createFeatureSelector<ClusterSummaryState>(clusterSummaryFeatureKey);
export const selectClusterSummary = createSelector(
selectClusterSummaryState,
(state: ClusterSummaryState) => state.clusterSummary
);

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export const clusterSummaryFeatureKey = 'clusterSummary';
export interface LoadClusterSummaryResponse {
clusterSummary: ClusterSummary;
}
export interface ClusterSummary {
clustered: boolean;
connectedToCluster: boolean;
connectedNodes?: string;
connectedNodeCount: number;
totalNodeCount: number;
}
export interface NodeSearchResult {
id: string;
address: string;
}
export interface ClusterSearchResults {
nodeResults: NodeSearchResult[];
}
export interface ClusterSummaryState {
clusterSummary: ClusterSummary | null;
error: string | null;
status: 'pending' | 'loading' | 'error' | 'success';
}

View File

@ -37,6 +37,8 @@ import { errorFeatureKey, ErrorState } from './error';
import { errorReducer } from './error/error.reducer';
import { documentationFeatureKey, DocumentationState } from './documentation';
import { documentationReducer } from './documentation/documentation.reducer';
import { clusterSummaryFeatureKey, ClusterSummaryState } from './cluster-summary';
import { clusterSummaryReducer } from './cluster-summary/cluster-summary.reducer';
export interface NiFiState {
router: RouterReducerState;
@ -50,6 +52,7 @@ export interface NiFiState {
[systemDiagnosticsFeatureKey]: SystemDiagnosticsState;
[componentStateFeatureKey]: ComponentStateState;
[documentationFeatureKey]: DocumentationState;
[clusterSummaryFeatureKey]: ClusterSummaryState;
}
export const rootReducers: ActionReducerMap<NiFiState> = {
@ -63,5 +66,6 @@ export const rootReducers: ActionReducerMap<NiFiState> = {
[controllerServiceStateFeatureKey]: controllerServiceStateReducer,
[systemDiagnosticsFeatureKey]: systemDiagnosticsReducer,
[componentStateFeatureKey]: componentStateReducer,
[documentationFeatureKey]: documentationReducer
[documentationFeatureKey]: documentationReducer,
[clusterSummaryFeatureKey]: clusterSummaryReducer
};