NIFI-12589: Queue Listing (#8235)

* NIFI-12589:
- Introducing queue listing.
- View flowfile content.
- Download flowfile content.
- View flowfile dialog.

* NIFI-12589:
- Addressing review feedback.

* NIFI-12589:
- Fixing merge conflict and removing additional @ts-ignore.

This closes #8235
This commit is contained in:
Matt Gilman 2024-01-12 16:09:48 -05:00 committed by GitHub
parent 4efabdcf51
commit a7546a7095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 2182 additions and 89 deletions

View File

@ -68,6 +68,11 @@ const routes: Routes = [
canMatch: [authenticationGuard],
loadChildren: () => import('./pages/bulletins/feature/bulletins.module').then((m) => m.BulletinsModule)
},
{
path: 'queue',
canMatch: [authenticationGuard],
loadChildren: () => import('./pages/queue/feature/queue.module').then((m) => m.QueueModule)
},
{
path: '',
canMatch: [authenticationGuard],

View File

@ -42,7 +42,6 @@ import { ControllerServiceStateEffects } from './state/contoller-service-state/c
import { SystemDiagnosticsEffects } from './state/system-diagnostics/system-diagnostics.effects';
import { FlowConfigurationEffects } from './state/flow-configuration/flow-configuration.effects';
// @ts-ignore
@NgModule({
declarations: [AppComponent],
imports: [

View File

@ -27,7 +27,7 @@ import { AccessPolicyService } from '../../service/access-policy.service';
import { AccessPolicyEntity, ComponentResourceAction, PolicyStatus, ResourceAction } from '../shared';
import { selectAccessPolicy, selectResourceAction, selectSaving } from './access-policy.selectors';
import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.component';
import { TenantEntity } from '../../../../state/shared';
import { isDefinedAndNotNull, TenantEntity } from '../../../../state/shared';
import { AddTenantToPolicyDialog } from '../../ui/common/add-tenant-to-policy-dialog/add-tenant-to-policy-dialog.component';
import { AddTenantsToPolicyRequest } from './index';
import { ComponentAccessPolicies } from '../../ui/component-access-policies/component-access-policies.component';
@ -62,13 +62,11 @@ export class AccessPolicyEffects {
reloadAccessPolicy$ = createEffect(() =>
this.actions$.pipe(
ofType(AccessPolicyActions.reloadAccessPolicy),
withLatestFrom(this.store.select(selectResourceAction)),
filter(([action, resourceAction]) => resourceAction != null),
withLatestFrom(this.store.select(selectResourceAction).pipe(isDefinedAndNotNull())),
switchMap(([action, resourceAction]) => {
return of(
AccessPolicyActions.loadAccessPolicy({
request: {
// @ts-ignore
resourceAction
}
})
@ -139,10 +137,8 @@ export class AccessPolicyEffects {
createAccessPolicy$ = createEffect(() =>
this.actions$.pipe(
ofType(AccessPolicyActions.createAccessPolicy),
withLatestFrom(this.store.select(selectResourceAction)),
filter(([action, resourceAction]) => resourceAction != null),
withLatestFrom(this.store.select(selectResourceAction).pipe(isDefinedAndNotNull())),
switchMap(([action, resourceAction]) =>
// @ts-ignore
from(this.accessPoliciesService.createAccessPolicy(resourceAction)).pipe(
map((response) => {
const accessPolicy: AccessPolicyEntity = response;
@ -243,12 +239,8 @@ export class AccessPolicyEffects {
this.actions$.pipe(
ofType(AccessPolicyActions.addTenantsToPolicy),
map((action) => action.request),
withLatestFrom(this.store.select(selectAccessPolicy)),
filter(([request, accessPolicyEntity]) => accessPolicyEntity != null),
switchMap(([request, accessPolicyEntity]) => {
// @ts-ignore
const accessPolicy: AccessPolicyEntity = accessPolicyEntity;
withLatestFrom(this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())),
switchMap(([request, accessPolicy]) => {
const users: TenantEntity[] = [...accessPolicy.component.users, ...request.users];
const userGroups: TenantEntity[] = [...accessPolicy.component.userGroups, ...request.userGroups];
@ -303,12 +295,8 @@ export class AccessPolicyEffects {
this.actions$.pipe(
ofType(AccessPolicyActions.removeTenantFromPolicy),
map((action) => action.request),
withLatestFrom(this.store.select(selectAccessPolicy)),
filter(([request, accessPolicyEntity]) => accessPolicyEntity != null),
switchMap(([request, accessPolicyEntity]) => {
// @ts-ignore
const accessPolicy: AccessPolicyEntity = accessPolicyEntity;
withLatestFrom(this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())),
switchMap(([request, accessPolicy]) => {
const users: TenantEntity[] = [...accessPolicy.component.users];
const userGroups: TenantEntity[] = [...accessPolicy.component.userGroups];
@ -376,17 +364,17 @@ export class AccessPolicyEffects {
deleteAccessPolicy$ = createEffect(() =>
this.actions$.pipe(
ofType(AccessPolicyActions.deleteAccessPolicy),
withLatestFrom(this.store.select(selectResourceAction), this.store.select(selectAccessPolicy)),
filter(([action, resourceAction, accessPolicy]) => resourceAction != null && accessPolicy != null),
withLatestFrom(
this.store.select(selectResourceAction).pipe(isDefinedAndNotNull()),
this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())
),
switchMap(([action, resourceAction, accessPolicy]) =>
// @ts-ignore
from(this.accessPoliciesService.deleteAccessPolicy(accessPolicy)).pipe(
map((response) => {
// the policy was removed, we need to reload the policy for this resource and action to fetch
// the inherited policy or correctly when it's not found
return AccessPolicyActions.loadAccessPolicy({
request: {
// @ts-ignore
resourceAction
}
});

View File

@ -38,7 +38,7 @@ import { distinctUntilChanged, filter } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { FormBuilder, FormControl, FormGroup, Validators } from '@angular/forms';
import { NiFiCommon } from '../../../../service/nifi-common.service';
import { ComponentType, SelectOption, TextTipInput } from '../../../../state/shared';
import { ComponentType, isDefinedAndNotNull, SelectOption, TextTipInput } from '../../../../state/shared';
import { TextTip } from '../../../../ui/common/tooltips/text-tip/text-tip.component';
import { AccessPolicyEntity, Action, ComponentResourceAction, PolicyStatus, ResourceAction } from '../../state/shared';
import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.actions';
@ -145,18 +145,13 @@ export class ComponentAccessPolicies implements OnInit, OnDestroy {
this.store
.select(selectComponentResourceActionFromRoute)
.pipe(
filter((resourceAction) => resourceAction != null),
isDefinedAndNotNull(),
distinctUntilChanged((a, b) => {
// @ts-ignore
const aResourceAction: ComponentResourceAction = a;
// @ts-ignore
const bResourceAction: ComponentResourceAction = b;
return (
aResourceAction.action == bResourceAction.action &&
aResourceAction.policy == bResourceAction.policy &&
aResourceAction.resource == bResourceAction.resource &&
aResourceAction.resourceIdentifier == bResourceAction.resourceIdentifier
a.action == b.action &&
a.policy == b.policy &&
a.resource == b.resource &&
a.resourceIdentifier == b.resourceIdentifier
);
}),
takeUntilDestroyed()

View File

@ -38,7 +38,13 @@ import { distinctUntilChanged, filter } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { FormBuilder, FormControl, FormGroup, Validators } from '@angular/forms';
import { NiFiCommon } from '../../../../service/nifi-common.service';
import { ComponentType, RequiredPermission, SelectOption, TextTipInput } from '../../../../state/shared';
import {
ComponentType,
isDefinedAndNotNull,
RequiredPermission,
SelectOption,
TextTipInput
} from '../../../../state/shared';
import { TextTip } from '../../../../ui/common/tooltips/text-tip/text-tip.component';
import { AccessPolicyEntity, Action, PolicyStatus, ResourceAction } from '../../state/shared';
import { loadExtensionTypesForPolicies } from '../../../../state/extension-types/extension-types.actions';
@ -126,13 +132,8 @@ export class GlobalAccessPolicies implements OnInit, OnDestroy {
this.store
.select(selectGlobalResourceActionFromRoute)
.pipe(
filter((resourceAction) => resourceAction != null),
distinctUntilChanged((aResourceAction, bResourceAction) => {
// @ts-ignore
const a: ResourceAction = aResourceAction;
// @ts-ignore
const b: ResourceAction = bResourceAction;
isDefinedAndNotNull(),
distinctUntilChanged((a, b) => {
return (
a.action == b.action && a.resource == b.resource && a.resourceIdentifier == b.resourceIdentifier
);

View File

@ -32,6 +32,7 @@ import {
navigateToEditCurrentProcessGroup,
navigateToManageComponentPolicies,
navigateToProvenanceForComponent,
navigateToQueueListing,
navigateToViewStatusHistoryForComponent,
reloadFlow,
replayLastProvenanceEvent,
@ -688,13 +689,19 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider {
},
{
condition: (selection: any) => {
// TODO - canListQueue
return false;
return this.canvasUtils.isConnection(selection);
},
clazz: 'fa fa-list',
text: 'List queue',
action: () => {
// TODO - listQueue
action: (selection: any) => {
const selectionData = selection.datum();
this.store.dispatch(
navigateToQueueListing({
request: {
connectionId: selectionData.id
}
})
);
}
},
{

View File

@ -188,6 +188,11 @@ export class CanvasView {
public updateCanvasVisibility(): void {
const self: CanvasView = this;
const canvasContainer: any = document.getElementById('canvas-container');
if (canvasContainer == null) {
return;
}
let translate = [this.x, this.y];
const scale = this.k;

View File

@ -67,7 +67,8 @@ import {
UpdateConnectionRequest,
UpdateConnectionSuccess,
UpdatePositionsRequest,
UploadProcessGroupRequest
UploadProcessGroupRequest,
NavigateToQueueListing
} from './index';
import { StatusHistoryRequest } from '../../../../state/status-history';
@ -315,6 +316,11 @@ export const navigateToControllerServicesForProcessGroup = createAction(
props<{ request: NavigateToControllerServicesRequest }>()
);
export const navigateToQueueListing = createAction(
`${CANVAS_PREFIX} Navigate To Queue Listing`,
props<{ request: NavigateToQueueListing }>()
);
export const editCurrentProcessGroup = createAction(
`${CANVAS_PREFIX} Edit Current Process Group`,
props<{

View File

@ -686,6 +686,18 @@ export class FlowEffects {
{ dispatch: false }
);
navigateToQueueListing$ = createEffect(
() =>
this.actions$.pipe(
ofType(FlowActions.navigateToQueueListing),
map((action) => action.request),
tap((request) => {
this.router.navigate(['/queue', request.connectionId]);
})
),
{ dispatch: false }
);
navigateToViewStatusHistoryForComponent$ = createEffect(
() =>
this.actions$.pipe(

View File

@ -230,6 +230,10 @@ export interface NavigateToControllerServicesRequest {
id: string;
}
export interface NavigateToQueueListing {
connectionId: string;
}
export interface EditCurrentProcessGroupRequest {
id: string;
}

View File

@ -58,9 +58,8 @@ import {
} from '../../state/flow/flow.selectors';
import { filter, map, switchMap, take, withLatestFrom } from 'rxjs';
import { restoreViewport, zoomFit } from '../../state/transform/transform.actions';
import { ComponentType } from '../../../../state/shared';
import { ComponentType, isDefinedAndNotNull } from '../../../../state/shared';
import { initialState } from '../../state/flow/flow.reducer';
import { ContextMenuDefinitionProvider } from '../../../../ui/common/context-menu/context-menu.component';
import { CanvasContextMenu } from '../../service/canvas-context-menu.service';
import { getStatusHistoryAndOpenDialog } from '../../../../state/status-history/status-history.actions';
import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.actions';
@ -168,37 +167,33 @@ export class Canvas implements OnInit, OnDestroy {
.pipe(
filter((processGroupId) => processGroupId != initialState.id),
switchMap(() => this.store.select(selectSingleEditedComponent)),
// ensure there is a selected component
filter((selectedComponent) => selectedComponent != null),
isDefinedAndNotNull(),
switchMap((selectedComponent) => {
// @ts-ignore
const component: SelectedComponent = selectedComponent;
let component$;
switch (component.componentType) {
switch (selectedComponent.componentType) {
case ComponentType.Processor:
component$ = this.store.select(selectProcessor(component.id));
component$ = this.store.select(selectProcessor(selectedComponent.id));
break;
case ComponentType.InputPort:
component$ = this.store.select(selectInputPort(component.id));
component$ = this.store.select(selectInputPort(selectedComponent.id));
break;
case ComponentType.OutputPort:
component$ = this.store.select(selectOutputPort(component.id));
component$ = this.store.select(selectOutputPort(selectedComponent.id));
break;
case ComponentType.ProcessGroup:
component$ = this.store.select(selectProcessGroup(component.id));
component$ = this.store.select(selectProcessGroup(selectedComponent.id));
break;
case ComponentType.RemoteProcessGroup:
component$ = this.store.select(selectRemoteProcessGroup(component.id));
component$ = this.store.select(selectRemoteProcessGroup(selectedComponent.id));
break;
case ComponentType.Connection:
component$ = this.store.select(selectConnection(component.id));
component$ = this.store.select(selectConnection(selectedComponent.id));
break;
case ComponentType.Funnel:
component$ = this.store.select(selectFunnel(component.id));
component$ = this.store.select(selectFunnel(selectedComponent.id));
break;
case ComponentType.Label:
component$ = this.store.select(selectLabel(component.id));
component$ = this.store.select(selectLabel(selectedComponent.id));
break;
default:
throw 'Unrecognized Component Type';

View File

@ -36,8 +36,7 @@ import { ComponentType } from '../../../../../../state/shared';
styleUrls: ['./new-canvas-item.component.scss']
})
export class NewCanvasItem implements OnInit {
// @ts-ignore
@Input() type: ComponentType;
@Input() type!: ComponentType;
@Input() iconClass: string = '';
@Input() iconHoverClass: string = '';

View File

@ -16,7 +16,7 @@
*/
/*
Parameter Contexts
Provenance
*/
import { Action, combineReducers, createFeatureSelector } from '@ngrx/store';

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { NgModule } from '@angular/core';
import { RouterModule, Routes } from '@angular/router';
import { Queue } from './queue.component';
import { QueueListingRoutingModule } from '../ui/queue-listing/queue-listing-routing.module';
const routes: Routes = [
{
path: '',
component: Queue,
children: [
{
path: '',
loadChildren: () => import('../ui/queue-listing/queue-listing.module').then((m) => m.QueueListingModule)
}
]
}
];
@NgModule({
imports: [RouterModule.forChild(routes)],
exports: [RouterModule]
})
export class QueueRoutingModule {}

View File

@ -0,0 +1,27 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="p-4 flex flex-col h-screen justify-between gap-y-5">
<div class="flex justify-end">
<button class="nifi-button" [routerLink]="['/']">
<i class="fa fa-times"></i>
</button>
</div>
<div class="flex-1">
<router-outlet></router-outlet>
</div>
</div>

View File

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { Queue } from './queue.component';
import { provideMockStore } from '@ngrx/store/testing';
import { RouterModule } from '@angular/router';
import { RouterTestingModule } from '@angular/router/testing';
import { initialState } from '../state/queue-listing/queue-listing.reducer';
describe('Queue', () => {
let component: Queue;
let fixture: ComponentFixture<Queue>;
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [Queue],
imports: [RouterModule, RouterTestingModule],
providers: [
provideMockStore({
initialState
})
]
});
fixture = TestBed.createComponent(Queue);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, OnDestroy, OnInit } from '@angular/core';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../state';
import { startCurrentUserPolling, stopCurrentUserPolling } from '../../../state/current-user/current-user.actions';
import { loadAbout } from '../../../state/about/about.actions';
@Component({
selector: 'queue',
templateUrl: './queue.component.html',
styleUrls: ['./queue.component.scss']
})
export class Queue implements OnInit, OnDestroy {
constructor(private store: Store<NiFiState>) {}
ngOnInit(): void {
this.store.dispatch(startCurrentUserPolling());
this.store.dispatch(loadAbout());
}
ngOnDestroy(): void {
this.store.dispatch(stopCurrentUserPolling());
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { NgModule } from '@angular/core';
import { CommonModule } from '@angular/common';
import { StoreModule } from '@ngrx/store';
import { EffectsModule } from '@ngrx/effects';
import { Queue } from './queue.component';
import { QueueRoutingModule } from './queue-routing.module';
import { queueFeatureKey, reducers } from '../state';
import { MatDialogModule } from '@angular/material/dialog';
import { QueueListingEffects } from '../state/queue-listing/queue-listing.effects';
@NgModule({
declarations: [Queue],
exports: [Queue],
imports: [CommonModule, MatDialogModule, QueueRoutingModule]
})
export class QueueModule {}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Observable, throwError } from 'rxjs';
import { HttpClient } from '@angular/common/http';
import { NiFiCommon } from '../../../service/nifi-common.service';
import { ParameterContextUpdateRequest, SubmitParameterContextUpdate } from '../../../state/shared';
import {
FlowFileSummary,
ListingRequest,
ListingRequestEntity,
SubmitQueueListingRequest
} from '../state/queue-listing';
@Injectable({ providedIn: 'root' })
export class QueueService {
private static readonly API: string = '../nifi-api';
constructor(
private httpClient: HttpClient,
private nifiCommon: NiFiCommon
) {}
/**
* The NiFi model contain the url for each component. That URL is an absolute URL. Angular CSRF handling
* does not work on absolute URLs, so we need to strip off the proto for the request header to be added.
*
* https://stackoverflow.com/a/59586462
*
* @param url
* @private
*/
private stripProtocol(url: string): string {
return this.nifiCommon.substringAfterFirst(url, ':');
}
getConnection(connectionId: string): Observable<any> {
return this.httpClient.get(`${QueueService.API}/connections/${connectionId}`);
}
getFlowFile(flowfileSummary: FlowFileSummary): Observable<any> {
return this.httpClient.get(this.stripProtocol(flowfileSummary.uri));
}
submitQueueListingRequest(queueListingRequest: SubmitQueueListingRequest): Observable<any> {
return this.httpClient.post(
`${QueueService.API}/flowfile-queues/${queueListingRequest.connectionId}/listing-requests`,
{}
);
}
pollQueueListingRequest(listingRequest: ListingRequest): Observable<any> {
return this.httpClient.get(this.stripProtocol(listingRequest.uri));
}
deleteQueueListingRequest(listingRequest: ListingRequest): Observable<any> {
return this.httpClient.delete(this.stripProtocol(listingRequest.uri));
}
downloadContent(flowfileSummary: FlowFileSummary): void {
let dataUri: string = `${this.stripProtocol(flowfileSummary.uri)}/content`;
const queryParameters: any = {};
// TODO - flowFileSummary.clusterNodeId in query parameters
if (Object.keys(queryParameters).length > 0) {
const query: string = new URLSearchParams(queryParameters).toString();
dataUri = `${dataUri}?${query}`;
}
window.open(dataUri);
}
viewContent(flowfileSummary: FlowFileSummary, contentViewerUrl: string): void {
// build the uri to the data
let dataUri: string = `${this.stripProtocol(flowfileSummary.uri)}/content`;
const dataUriParameters: any = {};
// TODO - flowFileSummary.clusterNodeId in query parameters
// include parameters if necessary
if (Object.keys(dataUriParameters).length > 0) {
const dataUriQuery: string = new URLSearchParams(dataUriParameters).toString();
dataUri = `${dataUri}?${dataUriQuery}`;
}
// if there's already a query string don't add another ?... this assumes valid
// input meaning that if the url has already included a ? it also contains at
// least one query parameter
let contentViewer: string = contentViewerUrl;
if (contentViewer.indexOf('?') === -1) {
contentViewer += '?';
} else {
contentViewer += '&';
}
const contentViewerParameters: any = {
ref: dataUri
};
// open the content viewer
const contentViewerQuery: string = new URLSearchParams(contentViewerParameters).toString();
window.open(`${contentViewer}${contentViewerQuery}`);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Queue Listing
*/
import { Action, combineReducers, createFeatureSelector } from '@ngrx/store';
import { queueListingFeatureKey, QueueListingState } from './queue-listing';
import { queueListingReducer } from './queue-listing/queue-listing.reducer';
export const queueFeatureKey = 'queue';
export interface QueueState {
[queueListingFeatureKey]: QueueListingState;
}
export function reducers(state: QueueState | undefined, action: Action) {
return combineReducers({
[queueListingFeatureKey]: queueListingReducer
})(state, action);
}
export const selectQueueState = createFeatureSelector<QueueState>(queueFeatureKey);

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export const queueListingFeatureKey = 'queueListing';
export interface FlowFile extends FlowFileSummary {
attributes: {
[key: string]: string;
};
contentClaimContainer?: string;
contentClaimSection?: string;
contentClaimIdentifier?: string;
contentClaimOffset?: number;
contentClaimFileSize?: string;
contentClaimFileSizeBytes?: number;
}
export interface FlowFileSummary {
uri: string;
uuid: string;
filename: string;
position?: number;
size: number;
queuedDuration: number;
lineageDuration: number;
penaltyExpiresIn: number;
penalized: boolean;
clusterNodeId?: string;
clusterNodeAddress?: string;
}
export interface QueueSize {
byteCount: number;
objectCount: number;
}
export interface ListingRequest {
id: string;
uri: string;
submissionTime: string;
lastUpdated: string;
percentCompleted: number;
finished: boolean;
failureReason: string;
maxResults: number;
sourceRunning: boolean;
destinationRunning: boolean;
state: string;
queueSize: QueueSize;
flowFileSummaries: FlowFileSummary[];
}
export interface ListingRequestEntity {
listingRequest: ListingRequest;
}
export interface LoadConnectionLabelRequest {
connectionId: string;
}
export interface LoadConnectionLabelResponse {
connectionLabel: string;
}
export interface SubmitQueueListingRequest {
connectionId: string;
}
export interface PollQueueListingSuccess {
requestEntity: ListingRequestEntity;
}
export interface ViewFlowFileRequest {
flowfileSummary: FlowFileSummary;
}
export interface DownloadFlowFileContentRequest {
flowfileSummary: FlowFileSummary;
}
export interface ViewFlowFileContentRequest {
flowfileSummary: FlowFileSummary;
}
export interface FlowFileDialogRequest {
flowfile: FlowFile;
}
export interface QueueListingState {
requestEntity: ListingRequestEntity | null;
connectionLabel: string;
loadedTimestamp: string;
error: string | null;
status: 'pending' | 'loading' | 'error' | 'success';
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createAction, props } from '@ngrx/store';
import {
DownloadFlowFileContentRequest,
FlowFileDialogRequest,
LoadConnectionLabelRequest,
LoadConnectionLabelResponse,
PollQueueListingSuccess,
SubmitQueueListingRequest,
ViewFlowFileContentRequest,
ViewFlowFileRequest
} from './index';
const QUEUE_PREFIX = '[Queue Listing]';
export const loadConnectionLabel = createAction(
`[${QUEUE_PREFIX}] Load Connection Label`,
props<{ request: LoadConnectionLabelRequest }>()
);
export const loadConnectionLabelSuccess = createAction(
`[${QUEUE_PREFIX}] Load Connection Label Success`,
props<{ response: LoadConnectionLabelResponse }>()
);
export const queueListingApiError = createAction(`[${QUEUE_PREFIX}] Queue Error`, props<{ error: string }>());
export const resetQueueListingState = createAction(`[${QUEUE_PREFIX}] Reset Queue Listing State`);
export const submitQueueListingRequest = createAction(
`[${QUEUE_PREFIX}] Submit Queue Listing Request`,
props<{ request: SubmitQueueListingRequest }>()
);
export const resubmitQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Resubmit Queue Listing Request`);
export const submitQueueListingRequestSuccess = createAction(
`[${QUEUE_PREFIX}] Submit Queue Listing Request Success`,
props<{ response: PollQueueListingSuccess }>()
);
export const startPollingQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Start Polling Queue Listing Request`);
export const pollQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Poll Queue Listing Request`);
export const pollQueueListingRequestSuccess = createAction(
`[${QUEUE_PREFIX}] Poll Queue Listing Request Success`,
props<{ response: PollQueueListingSuccess }>()
);
export const stopPollingQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Stop Polling Queue Listing Request`);
export const deleteQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Delete Queue Listing Request`);
export const viewFlowFile = createAction(`[${QUEUE_PREFIX}] View FlowFile`, props<{ request: ViewFlowFileRequest }>());
export const openFlowFileDialog = createAction(
`[${QUEUE_PREFIX}] Open FlowFile Dialog`,
props<{ request: FlowFileDialogRequest }>()
);
export const downloadFlowFileContent = createAction(
`[${QUEUE_PREFIX}] Download FlowFile Content`,
props<{ request: DownloadFlowFileContentRequest }>()
);
export const viewFlowFileContent = createAction(
`[${QUEUE_PREFIX}] View FlowFile Content`,
props<{ request: ViewFlowFileContentRequest }>()
);

View File

@ -0,0 +1,322 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import * as QueueListingActions from './queue-listing.actions';
import { Store } from '@ngrx/store';
import { CanvasState } from '../../../flow-designer/state';
import {
asyncScheduler,
catchError,
filter,
from,
interval,
map,
of,
switchMap,
take,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { selectConnectionIdFromRoute, selectListingRequestEntity } from './queue-listing.selectors';
import { QueueService } from '../../service/queue.service';
import { ListingRequest } from './index';
import { CancelDialog } from '../../../../ui/common/cancel-dialog/cancel-dialog.component';
import { MatDialog } from '@angular/material/dialog';
import { selectAbout } from '../../../../state/about/about.selectors';
import { FlowFileDialog } from '../../ui/queue-listing/flowfile-dialog/flowfile-dialog.component';
import { NiFiCommon } from '../../../../service/nifi-common.service';
import { isDefinedAndNotNull } from '../../../../state/shared';
@Injectable()
export class QueueListingEffects {
constructor(
private actions$: Actions,
private store: Store<CanvasState>,
private queueService: QueueService,
private dialog: MatDialog,
private nifiCommon: NiFiCommon
) {}
loadConnectionLabel$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.loadConnectionLabel),
map((action) => action.request),
switchMap((request) =>
from(this.queueService.getConnection(request.connectionId)).pipe(
map((response) => {
const connection: any = response.component;
let connectionLabel: string = 'Connection';
if (!this.nifiCommon.isBlank(connection.name)) {
connectionLabel = connection.name;
} else if (connection.selectedRelationships) {
connectionLabel = connection.selectedRelationships.join(', ');
}
return QueueListingActions.loadConnectionLabelSuccess({
response: {
connectionLabel
}
});
}),
catchError((error) =>
of(
QueueListingActions.loadConnectionLabelSuccess({
response: {
connectionLabel: 'Connection'
}
})
)
)
)
)
)
);
submitQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.submitQueueListingRequest),
map((action) => action.request),
switchMap((request) => {
const dialogReference = this.dialog.open(CancelDialog, {
data: {
title: 'Queue Listing',
message: 'Waiting for queue listing to complete...'
},
disableClose: true,
panelClass: 'small-dialog'
});
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
this.store.dispatch(QueueListingActions.stopPollingQueueListingRequest());
});
return from(this.queueService.submitQueueListingRequest(request)).pipe(
map((response) =>
QueueListingActions.submitQueueListingRequestSuccess({
response: {
requestEntity: response
}
})
),
catchError((error) =>
of(
QueueListingActions.queueListingApiError({
error: error.error
})
)
)
);
})
)
);
resubmitQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.resubmitQueueListingRequest),
withLatestFrom(this.store.select(selectConnectionIdFromRoute)),
switchMap(([action, connectionId]) =>
of(QueueListingActions.submitQueueListingRequest({ request: { connectionId } }))
)
)
);
submitQueueListingRequestSuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.submitQueueListingRequestSuccess),
map((action) => action.response),
switchMap((response) => {
const listingRequest: ListingRequest = response.requestEntity.listingRequest;
if (listingRequest.finished) {
return of(QueueListingActions.deleteQueueListingRequest());
} else {
return of(QueueListingActions.startPollingQueueListingRequest());
}
})
)
);
startPollingQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.startPollingQueueListingRequest),
switchMap(() =>
interval(2000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(QueueListingActions.stopPollingQueueListingRequest)))
)
),
switchMap(() => of(QueueListingActions.pollQueueListingRequest()))
)
);
pollQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.pollQueueListingRequest),
withLatestFrom(this.store.select(selectListingRequestEntity).pipe(isDefinedAndNotNull())),
switchMap(([action, requestEntity]) => {
return from(this.queueService.pollQueueListingRequest(requestEntity.listingRequest)).pipe(
map((response) =>
QueueListingActions.pollQueueListingRequestSuccess({
response: {
requestEntity: response
}
})
),
catchError((error) =>
of(
QueueListingActions.queueListingApiError({
error: error.error
})
)
)
);
})
)
);
pollQueueListingRequestSuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.pollQueueListingRequestSuccess),
map((action) => action.response),
filter((response) => response.requestEntity.listingRequest.finished),
switchMap((response) => of(QueueListingActions.stopPollingQueueListingRequest()))
)
);
stopPollingQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.stopPollingQueueListingRequest),
switchMap((response) => of(QueueListingActions.deleteQueueListingRequest()))
)
);
deleteQueueListingRequest$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueListingActions.deleteQueueListingRequest),
withLatestFrom(this.store.select(selectListingRequestEntity)),
tap(([action, requestEntity]) => {
this.dialog.closeAll();
if (requestEntity) {
this.queueService.deleteQueueListingRequest(requestEntity.listingRequest).subscribe();
}
})
),
{ dispatch: false }
);
viewFlowFile$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.viewFlowFile),
map((action) => action.request),
switchMap((request) =>
from(this.queueService.getFlowFile(request.flowfileSummary)).pipe(
map((response) =>
QueueListingActions.openFlowFileDialog({
request: {
flowfile: response.flowFile
}
})
),
catchError((error) =>
of(
QueueListingActions.queueListingApiError({
error: error.error
})
)
)
)
)
)
);
openFlowFileDialog$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueListingActions.openFlowFileDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectAbout)),
filter((about) => about != null),
tap(([request, about]) => {
const dialogReference = this.dialog.open(FlowFileDialog, {
data: request,
panelClass: 'large-dialog'
});
dialogReference.componentInstance.contentViewerAvailable = about?.contentViewerUrl != null ?? false;
dialogReference.componentInstance.downloadContent
.pipe(takeUntil(dialogReference.afterClosed()))
.subscribe(() => {
this.store.dispatch(
QueueListingActions.downloadFlowFileContent({
request: { flowfileSummary: request.flowfile }
})
);
});
if (about) {
dialogReference.componentInstance.viewContent
.pipe(takeUntil(dialogReference.afterClosed()))
.subscribe(() => {
this.store.dispatch(
QueueListingActions.viewFlowFileContent({
request: { flowfileSummary: request.flowfile }
})
);
});
}
})
),
{ dispatch: false }
);
downloadFlowFileContent$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueListingActions.downloadFlowFileContent),
map((action) => action.request),
tap((request) => this.queueService.downloadContent(request.flowfileSummary))
),
{ dispatch: false }
);
viewFlowFileContent$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueListingActions.viewFlowFileContent),
map((action) => action.request),
withLatestFrom(this.store.select(selectAbout).pipe(isDefinedAndNotNull())),
tap(([request, about]) => {
this.queueService.viewContent(request.flowfileSummary, about.contentViewerUrl);
})
),
{ dispatch: false }
);
queueListingApiError$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueListingActions.queueListingApiError),
tap(() => this.dialog.closeAll())
),
{ dispatch: false }
);
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createReducer, on } from '@ngrx/store';
import { QueueListingState } from './index';
import {
pollQueueListingRequestSuccess,
submitQueueListingRequest,
submitQueueListingRequestSuccess,
resetQueueListingState,
queueListingApiError,
loadConnectionLabelSuccess
} from './queue-listing.actions';
export const initialState: QueueListingState = {
requestEntity: null,
connectionLabel: 'Connection',
loadedTimestamp: 'N/A',
error: null,
status: 'pending'
};
export const queueListingReducer = createReducer(
initialState,
on(loadConnectionLabelSuccess, (state, { response }) => ({
...state,
connectionLabel: response.connectionLabel
})),
on(submitQueueListingRequest, (state, { request }) => ({
...state,
status: 'loading' as const
})),
on(submitQueueListingRequestSuccess, pollQueueListingRequestSuccess, (state, { response }) => ({
...state,
requestEntity: response.requestEntity,
loadedTimestamp: response.requestEntity.listingRequest.lastUpdated,
error: null,
status: 'success' as const
})),
on(queueListingApiError, (state, { error }) => ({
...state,
error,
status: 'error' as const
})),
on(resetQueueListingState, (state) => ({
...initialState
}))
);

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createSelector } from '@ngrx/store';
import { queueListingFeatureKey, QueueListingState } from './index';
import { QueueState, selectQueueState } from '../index';
import { selectCurrentRoute } from '../../../../state/router/router.selectors';
export const selectQueueListingState = createSelector(
selectQueueState,
(state: QueueState) => state[queueListingFeatureKey]
);
export const selectListingRequestEntity = createSelector(
selectQueueListingState,
(state: QueueListingState) => state.requestEntity
);
export const selectStatus = createSelector(selectQueueListingState, (state: QueueListingState) => state.status);
export const selectError = createSelector(selectQueueListingState, (state: QueueListingState) => state.error);
export const selectConnectionLabel = createSelector(
selectQueueListingState,
(state: QueueListingState) => state.connectionLabel
);
export const selectLoadedTimestamp = createSelector(
selectQueueListingState,
(state: QueueListingState) => state.loadedTimestamp
);
export const selectConnectionIdFromRoute = createSelector(selectCurrentRoute, (route) => {
if (route?.params.connectionId != null) {
return route.params.connectionId;
}
return null;
});

View File

@ -0,0 +1,229 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<h2 mat-dialog-title>FlowFile</h2>
<div class="flowfile">
<mat-dialog-content>
<mat-tab-group>
<mat-tab label="Details">
<div class="tab-content py-4">
<div class="absolute inset-0 flex gap-x-4">
<div class="w-full flex flex-col gap-y-3">
<div class="flex flex-col">
<div class="flowfile-header">FlowFile Details</div>
</div>
<div class="flex flex-col">
<div>UUID</div>
<ng-container
*ngTemplateOutlet="
formatValue;
context: { $implicit: request.flowfile.uuid }
"></ng-container>
</div>
<div class="flex flex-col">
<div>Filename</div>
<ng-container
*ngTemplateOutlet="
formatValue;
context: { $implicit: request.flowfile.filename }
"></ng-container>
</div>
<div class="flex flex-col">
<div>File Size</div>
<ng-container
*ngTemplateOutlet="
formatValue;
context: {
$implicit: request.flowfile.size,
title: request.flowfile.size + ' bytes'
}
"></ng-container>
</div>
<div class="flex flex-col">
<div>Queue Position</div>
<ng-container
*ngTemplateOutlet="
formatValue;
context: { $implicit: request.flowfile.position }
"></ng-container>
</div>
<div class="flex flex-col">
<div>Queued Duration</div>
<ng-container
*ngTemplateOutlet="
formatDuration;
context: { $implicit: request.flowfile.queuedDuration }
"></ng-container>
</div>
<div class="flex flex-col">
<div>Lineage Duration</div>
<ng-container
*ngTemplateOutlet="
formatDuration;
context: { $implicit: request.flowfile.lineageDuration }
"></ng-container>
</div>
<div class="flex flex-col">
<div>Penalized</div>
<div class="value">{{ request.flowfile.penalized ? 'Yes' : 'No' }}</div>
</div>
<ng-container *ngIf="request.flowfile.clusterNodeId">
<div class="flex flex-col">
<div>Node Address</div>
<ng-container
*ngTemplateOutlet="
formatValue;
context: { $implicit: request.flowfile.clusterNodeAddress }
"></ng-container>
</div>
</ng-container>
<ng-template #formatDuration let-duration>
<ng-container *ngIf="duration != null; else noDuration">
<div class="value">{{ formatDurationValue(duration) }}</div>
</ng-container>
<ng-template #noDuration>
<div class="unset">No value set</div>
</ng-template>
</ng-template>
</div>
<div class="w-full flex flex-col gap-y-3">
<div class="flex flex-col">
<div class="flowfile-header">Content Claim</div>
</div>
<div
class="flex flex-col gap-y-3"
*ngIf="request.flowfile.contentClaimContainer != null; else noContent">
<div>
<div>Container</div>
<ng-container
*ngTemplateOutlet="
formatContentValue;
context: { $implicit: request.flowfile.contentClaimContainer }
"></ng-container>
</div>
<div>
<div>Section</div>
<ng-container
*ngTemplateOutlet="
formatContentValue;
context: { $implicit: request.flowfile.contentClaimSection }
"></ng-container>
</div>
<div>
<div>Identifier</div>
<ng-container
*ngTemplateOutlet="
formatContentValue;
context: { $implicit: request.flowfile.contentClaimIdentifier }
"></ng-container>
</div>
<div>
<div>Offset</div>
<ng-container
*ngTemplateOutlet="
formatContentValue;
context: { $implicit: request.flowfile.contentClaimOffset }
"></ng-container>
</div>
<div>
<div>Size</div>
<ng-container
*ngTemplateOutlet="
formatContentValue;
context: {
$implicit: request.flowfile.contentClaimFileSize,
title: request.flowfile.contentClaimFileSizeBytes + ' bytes'
}
"></ng-container>
</div>
<div class="flex">
<button color="accent" mat-raised-button (click)="downloadContentClicked()">
<i class="fa fa-download"></i>
Download
</button>
<button
*ngIf="contentViewerAvailable"
class="ml-3"
color="accent"
mat-raised-button
(click)="viewContentClicked()">
<i class="fa fa-eye"></i>
View
</button>
</div>
</div>
<ng-template #noContent>
<div class="unset">No Content Available</div>
</ng-template>
</div>
</div>
</div>
</mat-tab>
<mat-tab label="Attributes">
<div class="tab-content py-4">
<div class="absolute inset-0 flex flex-col gap-y-4">
<div class="flex">
<div class="flowfile-header">Attribute Values</div>
</div>
<div class="flex flex-col">
<div *ngFor="let attribute of request.flowfile.attributes | keyvalue">
<div class="mb-4 flex flex-col">
<div>{{ attribute.key }}</div>
<ng-container
*ngTemplateOutlet="
formatValue;
context: { $implicit: attribute.value }
"></ng-container>
</div>
</div>
</div>
</div>
</div>
</mat-tab>
</mat-tab-group>
<ng-template #formatValue let-value let-title="title">
<ng-container *ngIf="value != null; else nullValue">
<ng-container *ngIf="value === ''; else nonEmptyValue">
<div class="unset">Empty string set</div>
</ng-container>
<ng-template #nonEmptyValue>
<div class="value" *ngIf="title == null; else valueWithTitle">{{ value }}</div>
<ng-template #valueWithTitle>
<div class="value" [title]="title">{{ value }}</div>
</ng-template>
</ng-template>
</ng-container>
<ng-template #nullValue>
<div class="unset">No value set</div>
</ng-template>
</ng-template>
<ng-template #formatContentValue let-value let-title="title">
<ng-container *ngIf="value != null; else nullValue">
<div class="value" *ngIf="title == null; else valueWithTitle">{{ value }}</div>
<ng-template #valueWithTitle>
<div class="value" [title]="title">{{ value }}</div>
</ng-template>
</ng-container>
<ng-template #nullValue>
<div class="unset">No value previously set</div>
</ng-template>
</ng-template>
</mat-dialog-content>
<mat-dialog-actions align="end">
<button color="primary" mat-raised-button mat-dialog-close>Ok</button>
</mat-dialog-actions>
</div>

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@use '@angular/material' as mat;
.flowfile {
@include mat.button-density(-1);
.mdc-dialog__content {
padding: 0 16px;
font-size: 14px;
.tab-content {
position: relative;
height: 475px;
overflow-y: auto;
.flowfile-header {
color: #728e9b;
font-size: 15px;
font-family: 'Roboto Slab';
font-style: normal;
font-weight: bold;
}
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { FlowFileDialog } from './flowfile-dialog.component';
import { MAT_DIALOG_DATA } from '@angular/material/dialog';
import { BrowserAnimationsModule } from '@angular/platform-browser/animations';
import { FlowFileDialogRequest } from '../../../state/queue-listing';
describe('FlowFileDialog', () => {
let component: FlowFileDialog;
let fixture: ComponentFixture<FlowFileDialog>;
const data: FlowFileDialogRequest = {
flowfile: {
uri: 'https://localhost:4200/nifi-api/flowfile-queues/eea858d0-018c-1000-57fe-66ba110b3bcb/flowfiles/fc165889-3493-404c-9895-62d49a06801b',
uuid: 'fc165889-3493-404c-9895-62d49a06801b',
filename: '93a06a31-6b50-4d04-9b45-6a2a4a5e2dd1',
size: 0,
queuedDuration: 172947006,
lineageDuration: 172947006,
penaltyExpiresIn: 0,
attributes: {
path: './',
filename: '93a06a31-6b50-4d04-9b45-6a2a4a5e2dd1',
uuid: 'fc165889-3493-404c-9895-62d49a06801b'
},
penalized: false
}
};
beforeEach(() => {
TestBed.configureTestingModule({
imports: [FlowFileDialog, BrowserAnimationsModule],
providers: [{ provide: MAT_DIALOG_DATA, useValue: data }]
});
fixture = TestBed.createComponent(FlowFileDialog);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, EventEmitter, Inject, Input, Output } from '@angular/core';
import { MAT_DIALOG_DATA, MatDialogModule } from '@angular/material/dialog';
import { FormsModule, ReactiveFormsModule } from '@angular/forms';
import { MatInputModule } from '@angular/material/input';
import { MatCheckboxModule } from '@angular/material/checkbox';
import { MatButtonModule } from '@angular/material/button';
import { AsyncPipe, KeyValuePipe, NgForOf, NgIf, NgTemplateOutlet } from '@angular/common';
import { MatDatepickerModule } from '@angular/material/datepicker';
import { MatTabsModule } from '@angular/material/tabs';
import { FlowFileDialogRequest } from '../../../state/queue-listing';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
@Component({
selector: 'flowfile-dialog',
standalone: true,
templateUrl: './flowfile-dialog.component.html',
imports: [
ReactiveFormsModule,
MatDialogModule,
MatInputModule,
MatCheckboxModule,
MatButtonModule,
NgIf,
AsyncPipe,
NgForOf,
MatDatepickerModule,
MatTabsModule,
NgTemplateOutlet,
FormsModule,
KeyValuePipe
],
styleUrls: ['./flowfile-dialog.component.scss']
})
export class FlowFileDialog {
@Input() contentViewerAvailable!: boolean;
@Output() downloadContent: EventEmitter<void> = new EventEmitter<void>();
@Output() viewContent: EventEmitter<void> = new EventEmitter<void>();
constructor(
@Inject(MAT_DIALOG_DATA) public request: FlowFileDialogRequest,
private nifiCommon: NiFiCommon
) {}
formatDurationValue(duration: number): string {
if (duration === 0) {
return '< 1 sec';
}
return this.nifiCommon.formatDuration(duration);
}
downloadContentClicked(): void {
this.downloadContent.next();
}
viewContentClicked(): void {
this.viewContent.next();
}
}

View File

@ -0,0 +1,147 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="flowfile-table h-full flex flex-col gap-y-2">
<h3 class="text-xl bold queue-listing-header">{{ connectionLabel }}</h3>
<div class="flex justify-between">
<div class="value">
Display {{ displayObjectCount }} of {{ formatCount(queueSizeObjectCount) }} ({{
formatBytes(queueSizeByteCount)
}}) bytes
</div>
<div class="listing-message">
<ng-container *ngIf="sourceRunning && destinationRunning; else bothNotRunning">
The source and destination of this queue are currently running. This listing may no longer be accurate.
</ng-container>
<ng-template #bothNotRunning>
<ng-container *ngIf="sourceRunning; else sourceNotRunning">
The source of this queue is currently running. This listing may no longer be accurate.
</ng-container>
</ng-template>
<ng-template #sourceNotRunning>
<ng-container *ngIf="destinationRunning">
The destination of this queue is currently running. This listing may no longer be accurate.
</ng-container>
</ng-template>
</div>
</div>
<div class="flex-1 relative">
<div class="listing-table border absolute inset-0 overflow-y-auto">
<table mat-table [dataSource]="dataSource">
<!-- More Details Column -->
<ng-container matColumnDef="moreDetails">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<div
class="pointer fa fa-info-circle"
title="View Details"
(click)="viewFlowFileClicked(item)"></div>
</td>
</ng-container>
<!-- Position Column -->
<ng-container matColumnDef="position">
<th mat-header-cell *matHeaderCellDef>Position</th>
<td mat-cell *matCellDef="let item">
{{ item.position }}
</td>
</ng-container>
<!-- FlowFile Uuid Column -->
<ng-container matColumnDef="flowFileUuid">
<th mat-header-cell *matHeaderCellDef>UUID</th>
<td mat-cell *matCellDef="let item">
{{ item.uuid }}
</td>
</ng-container>
<!-- File Name Column -->
<ng-container matColumnDef="fileName">
<th mat-header-cell *matHeaderCellDef>Filename</th>
<td mat-cell *matCellDef="let item">
{{ item.filename }}
</td>
</ng-container>
<!-- File Size Column -->
<ng-container matColumnDef="fileSize">
<th mat-header-cell *matHeaderCellDef>File Size</th>
<td mat-cell *matCellDef="let item">
{{ formatBytes(item.size) }}
</td>
</ng-container>
<!-- Queued Duration Column -->
<ng-container matColumnDef="queuedDuration">
<th mat-header-cell *matHeaderCellDef>Queued Duration</th>
<td mat-cell *matCellDef="let item">
{{ formatDuration(item.queuedDuration) }}
</td>
</ng-container>
<!-- Lineage Duration Column -->
<ng-container matColumnDef="lineageDuration">
<th mat-header-cell *matHeaderCellDef>Lineage Duration</th>
<td mat-cell *matCellDef="let item">
{{ formatDuration(item.lineageDuration) }}
</td>
</ng-container>
<!-- Penalized Column -->
<ng-container matColumnDef="penalized">
<th mat-header-cell *matHeaderCellDef>Penalized</th>
<td mat-cell *matCellDef="let item">
{{ item.penalized ? 'Yes' : 'No' }}
</td>
</ng-container>
<!-- Actions Column -->
<ng-container matColumnDef="actions">
<th mat-header-cell *matHeaderCellDef></th>
<td mat-cell *matCellDef="let item">
<div class="flex items-center gap-x-3">
<div
*ngIf="item.size > 0"
class="pointer fa fa-download"
title="Download content"
(click)="downloadContentClicked(item)"></div>
<div
*ngIf="contentViewerAvailable && item.size > 0"
class="pointer fa fa-eye"
title="View content"
(click)="viewContentClicked(item)"></div>
<div
*ngIf="currentUser.provenancePermissions.canRead"
class="pointer icon icon-provenance"
title="Provenance"
[routerLink]="['/provenance']"
[queryParams]="{ flowFileUuid: item.uuid }"></div>
</div>
</td>
</ng-container>
<tr mat-header-row *matHeaderRowDef="displayedColumns; sticky: true"></tr>
<tr
mat-row
*matRowDef="let row; let even = even; columns: displayedColumns"
(click)="select(row)"
[class.selected]="isSelected(row)"
[class.even]="even"></tr>
</table>
</div>
</div>
</div>

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.flowfile-table {
.queue-listing-header {
color: #728e9b;
}
.listing-message {
color: #ba554a;
}
.listing-table {
table {
.mat-column-moreDetails {
min-width: 50px;
width: 50px;
}
.mat-column-position {
min-width: 75px;
width: 75px;
}
.mat-column-penalized {
min-width: 85px;
width: 85px;
}
.mat-column-actions {
min-width: 100px;
width: 100px;
}
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { FlowFileTable } from './flowfile-table.component';
import { MatTableModule } from '@angular/material/table';
import { BrowserAnimationsModule } from '@angular/platform-browser/animations';
describe('FlowFileTable', () => {
let component: FlowFileTable;
let fixture: ComponentFixture<FlowFileTable>;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [FlowFileTable, MatTableModule, BrowserAnimationsModule]
});
fixture = TestBed.createComponent(FlowFileTable);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { AfterViewInit, Component, EventEmitter, Input, Output } from '@angular/core';
import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { TextTip } from '../../../../../ui/common/tooltips/text-tip/text-tip.component';
import { BulletinsTip } from '../../../../../ui/common/tooltips/bulletins-tip/bulletins-tip.component';
import { ValidationErrorsTip } from '../../../../../ui/common/tooltips/validation-errors-tip/validation-errors-tip.component';
import { NiFiCommon } from '../../../../../service/nifi-common.service';
import { NgForOf, NgIf } from '@angular/common';
import { ProvenanceEventSummary } from '../../../../../state/shared';
import { RouterLink } from '@angular/router';
import { FlowFileSummary, ListingRequest } from '../../../state/queue-listing';
import { CurrentUser } from '../../../../../state/current-user';
import { Flow } from '../../../../flow-designer/state/flow';
@Component({
selector: 'flowfile-table',
standalone: true,
templateUrl: './flowfile-table.component.html',
imports: [MatTableModule, NgForOf, NgIf, RouterLink],
styleUrls: ['./flowfile-table.component.scss', '../../../../../../assets/styles/listing-table.scss']
})
export class FlowFileTable implements AfterViewInit {
@Input() connectionLabel!: string;
@Input() set listingRequest(listingRequest: ListingRequest) {
if (listingRequest.flowFileSummaries) {
this.dataSource.data = this.sortFlowFiles(listingRequest.flowFileSummaries);
this.displayObjectCount = this.dataSource.data.length;
this.queueSizeObjectCount = listingRequest.queueSize.objectCount;
this.queueSizeByteCount = listingRequest.queueSize.byteCount;
this.sourceRunning = listingRequest.sourceRunning;
this.destinationRunning = listingRequest.destinationRunning;
}
}
@Input() currentUser!: CurrentUser;
@Input() contentViewerAvailable!: boolean;
@Output() viewFlowFile: EventEmitter<FlowFileSummary> = new EventEmitter<FlowFileSummary>();
@Output() downloadContent: EventEmitter<FlowFileSummary> = new EventEmitter<FlowFileSummary>();
@Output() viewContent: EventEmitter<FlowFileSummary> = new EventEmitter<FlowFileSummary>();
protected readonly TextTip = TextTip;
protected readonly BulletinsTip = BulletinsTip;
protected readonly ValidationErrorsTip = ValidationErrorsTip;
// TODO - conditionally include the cluster column
displayedColumns: string[] = [
'moreDetails',
'position',
'flowFileUuid',
'fileName',
'fileSize',
'queuedDuration',
'lineageDuration',
'penalized',
'actions'
];
dataSource: MatTableDataSource<FlowFileSummary> = new MatTableDataSource<FlowFileSummary>();
selectedUuid: string | null = null;
sourceRunning: boolean = false;
destinationRunning: boolean = false;
displayObjectCount: number = 0;
queueSizeObjectCount: number = 0;
queueSizeByteCount: number = 0;
constructor(private nifiCommon: NiFiCommon) {}
ngAfterViewInit(): void {}
sortFlowFiles(summaries: FlowFileSummary[]): FlowFileSummary[] {
const data: FlowFileSummary[] = summaries.slice();
return data.sort((a: FlowFileSummary, b: FlowFileSummary) => {
const aIsUndefined: boolean = typeof a.position === 'undefined';
const bIsUndefined: boolean = typeof b.position === 'undefined';
if (aIsUndefined && bIsUndefined) {
return 0;
} else if (aIsUndefined) {
return 1;
} else if (bIsUndefined) {
return -1;
}
// @ts-ignore
return this.nifiCommon.compareNumber(a.position, b.position);
});
}
formatBytes(size: number): string {
return this.nifiCommon.formatDataSize(size);
}
formatCount(count: number): string {
return this.nifiCommon.formatInteger(count);
}
formatDuration(duration: number): string {
return this.nifiCommon.formatDuration(duration);
}
select(summary: FlowFileSummary): void {
this.selectedUuid = summary.uuid;
}
isSelected(summary: FlowFileSummary): boolean {
if (this.selectedUuid) {
return summary.uuid == this.selectedUuid;
}
return false;
}
viewFlowFileClicked(summary: FlowFileSummary): void {
this.viewFlowFile.next(summary);
}
downloadContentClicked(summary: FlowFileSummary): void {
this.downloadContent.next(summary);
}
viewContentClicked(summary: FlowFileSummary): void {
this.viewContent.next(summary);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { NgModule } from '@angular/core';
import { RouterModule, Routes } from '@angular/router';
import { QueueListing } from './queue-listing.component';
const routes: Routes = [
{
path: ':connectionId',
component: QueueListing
}
];
@NgModule({
imports: [RouterModule.forChild(routes)],
exports: [RouterModule]
})
export class QueueListingRoutingModule {}

View File

@ -0,0 +1,48 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<div class="flex flex-col gap-y-2 h-full" *ngIf="status$ | async; let status">
<div class="flex-1">
<div class="value" *ngIf="status === 'error'; else noError">
{{ error$ | async }}
</div>
<ng-template #noError>
<ng-container *ngIf="listingRequestEntity$ | async as entity; else initialLoading">
<flowfile-table
[connectionLabel]="(connectionLabel$ | async)!"
[listingRequest]="entity.listingRequest"
[currentUser]="(currentUser$ | async)!"
[contentViewerAvailable]="contentViewerAvailable((about$ | async)!)"
(viewFlowFile)="viewFlowFile($event)"
(downloadContent)="downloadContent($event)"
(viewContent)="viewContent($event)"></flowfile-table>
</ng-container>
<ng-template #initialLoading>
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>
</ng-template>
</ng-template>
</div>
<div class="flex justify-between">
<div class="flex items-center gap-x-2">
<button class="nifi-button" (click)="refreshClicked()">
<i class="fa fa-refresh" [class.fa-spin]="status === 'loading'"></i>
</button>
<div>Last updated:</div>
<div class="refresh-timestamp">{{ loadedTimestamp$ | async }}</div>
</div>
</div>
</div>

View File

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { QueueListing } from './queue-listing.component';
import { provideMockStore } from '@ngrx/store/testing';
import { initialState } from '../../state/queue-listing/queue-listing.reducer';
describe('QueueListing', () => {
let component: QueueListing;
let fixture: ComponentFixture<QueueListing>;
beforeEach(() => {
TestBed.configureTestingModule({
declarations: [QueueListing],
providers: [provideMockStore({ initialState })]
});
fixture = TestBed.createComponent(QueueListing);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Component, OnDestroy } from '@angular/core';
import { Store } from '@ngrx/store';
import { distinctUntilChanged, filter } from 'rxjs';
import {
selectConnectionIdFromRoute,
selectConnectionLabel,
selectError,
selectListingRequestEntity,
selectLoadedTimestamp,
selectStatus
} from '../../state/queue-listing/queue-listing.selectors';
import { FlowFileSummary } from '../../state/queue-listing';
import {
downloadFlowFileContent,
loadConnectionLabel,
resetQueueListingState,
resubmitQueueListingRequest,
submitQueueListingRequest,
viewFlowFile,
viewFlowFileContent
} from '../../state/queue-listing/queue-listing.actions';
import { selectCurrentUser } from '../../../../state/current-user/current-user.selectors';
import { NiFiState } from '../../../../state';
import { selectAbout } from '../../../../state/about/about.selectors';
import { About } from '../../../../state/about';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({
selector: 'queue-listing',
templateUrl: './queue-listing.component.html',
styleUrls: ['./queue-listing.component.scss']
})
export class QueueListing implements OnDestroy {
status$ = this.store.select(selectStatus);
error$ = this.store.select(selectError);
connectionLabel$ = this.store.select(selectConnectionLabel);
loadedTimestamp$ = this.store.select(selectLoadedTimestamp);
listingRequestEntity$ = this.store.select(selectListingRequestEntity);
currentUser$ = this.store.select(selectCurrentUser);
about$ = this.store.select(selectAbout);
constructor(private store: Store<NiFiState>) {
this.store
.select(selectConnectionIdFromRoute)
.pipe(
filter((connectionId) => connectionId != null),
distinctUntilChanged(),
takeUntilDestroyed()
)
.subscribe((connectionId) => {
this.store.dispatch(
loadConnectionLabel({
request: {
connectionId
}
})
);
this.store.dispatch(
submitQueueListingRequest({
request: {
connectionId
}
})
);
});
}
refreshClicked(): void {
this.store.dispatch(resubmitQueueListingRequest());
}
contentViewerAvailable(about: About): boolean {
return about.contentViewerUrl != null;
}
viewFlowFile(flowfileSummary: FlowFileSummary): void {
this.store.dispatch(viewFlowFile({ request: { flowfileSummary } }));
}
downloadContent(flowfileSummary: FlowFileSummary): void {
this.store.dispatch(downloadFlowFileContent({ request: { flowfileSummary } }));
}
viewContent(flowfileSummary: FlowFileSummary): void {
this.store.dispatch(viewFlowFileContent({ request: { flowfileSummary } }));
}
ngOnDestroy(): void {
this.store.dispatch(resetQueueListingState());
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { NgModule } from '@angular/core';
import { CommonModule } from '@angular/common';
import { QueueListing } from './queue-listing.component';
import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { NifiTooltipDirective } from '../../../../ui/common/tooltips/nifi-tooltip.directive';
import { QueueListingRoutingModule } from './queue-listing-routing.module';
import { FlowFileTable } from './flowfile-table/flowfile-table.component';
import { StoreModule } from '@ngrx/store';
import { EffectsModule } from '@ngrx/effects';
import { queueFeatureKey, reducers } from '../../state';
import { QueueListingEffects } from '../../state/queue-listing/queue-listing.effects';
@NgModule({
declarations: [QueueListing],
exports: [QueueListing],
imports: [
CommonModule,
QueueListingRoutingModule,
NgxSkeletonLoaderModule,
NifiTooltipDirective,
FlowFileTable,
StoreModule.forFeature(queueFeatureKey, reducers),
EffectsModule.forFeature(QueueListingEffects)
]
})
export class QueueListingModule {}

View File

@ -37,7 +37,7 @@ import { selectControllerService, selectControllerServiceSetEnableRequest } from
import { OkDialog } from '../../ui/common/ok-dialog/ok-dialog.component';
import { MatDialog } from '@angular/material/dialog';
import { ControllerServiceStateService } from '../../service/controller-service-state.service';
import { ControllerServiceEntity, ControllerServiceReferencingComponentEntity } from '../shared';
import { ControllerServiceEntity, ControllerServiceReferencingComponentEntity, isDefinedAndNotNull } from '../shared';
import { SetEnableRequest, SetEnableStep } from './index';
@Injectable()
@ -53,12 +53,10 @@ export class ControllerServiceStateEffects {
this.actions$.pipe(
ofType(ControllerServiceActions.submitEnableRequest),
map((action) => action.request),
withLatestFrom(this.store.select(selectControllerService)),
filter(([request, controllerService]) => !!controllerService),
withLatestFrom(this.store.select(selectControllerService).pipe(isDefinedAndNotNull())),
switchMap(([request, controllerService]) => {
if (
request.scope === 'SERVICE_AND_REFERENCING_COMPONENTS' &&
// @ts-ignore
this.hasUnauthorizedReferences(controllerService.component.referencingComponents)
) {
return of(
@ -79,10 +77,8 @@ export class ControllerServiceStateEffects {
submitDisableRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(ControllerServiceActions.submitDisableRequest),
withLatestFrom(this.store.select(selectControllerService)),
filter(([request, controllerService]) => !!controllerService),
withLatestFrom(this.store.select(selectControllerService).pipe(isDefinedAndNotNull())),
switchMap(([request, controllerService]) => {
// @ts-ignore
if (this.hasUnauthorizedReferences(controllerService.component.referencingComponents)) {
return of(
ControllerServiceActions.setEnableStepFailure({
@ -168,20 +164,16 @@ export class ControllerServiceStateEffects {
this.actions$.pipe(
ofType(ControllerServiceActions.pollControllerService),
withLatestFrom(
this.store.select(selectControllerService),
this.store.select(selectControllerService).pipe(isDefinedAndNotNull()),
this.store.select(selectControllerServiceSetEnableRequest)
),
filter(([action, controllerService, setEnableRequest]) => !!controllerService),
switchMap(([action, controllerService, setEnableRequest]) => {
// @ts-ignore
const cs: ControllerServiceEntity = controllerService;
return from(this.controllerServiceStateService.getControllerService(cs.id)).pipe(
switchMap(([action, controllerService, setEnableRequest]) =>
from(this.controllerServiceStateService.getControllerService(controllerService.id)).pipe(
map((response) =>
ControllerServiceActions.pollControllerServiceSuccess({
response: {
controllerService: response,
currentStep: this.getNextStep(setEnableRequest, cs)
currentStep: this.getNextStep(setEnableRequest, controllerService)
},
previousStep: setEnableRequest.currentStep
})
@ -196,8 +188,8 @@ export class ControllerServiceStateEffects {
})
)
)
);
})
)
)
)
);

View File

@ -15,6 +15,17 @@
* limitations under the License.
*/
import { filter, Observable } from 'rxjs';
export function isDefinedAndNotNull<T>() {
return (source$: Observable<null | undefined | T>) =>
source$.pipe(
filter((input: null | undefined | T): input is T => {
return input !== null && typeof input !== undefined;
})
);
}
export interface OkDialogRequest {
title: string;
message: string;

View File

@ -17,7 +17,7 @@
import { Component, EventEmitter, Inject, Input, Output } from '@angular/core';
import { MAT_DIALOG_DATA, MatDialogModule } from '@angular/material/dialog';
import { FormBuilder, FormsModule, ReactiveFormsModule } from '@angular/forms';
import { FormsModule } from '@angular/forms';
import { MatInputModule } from '@angular/material/input';
import { MatCheckboxModule } from '@angular/material/checkbox';
import { MatButtonModule } from '@angular/material/button';
@ -25,14 +25,13 @@ import { AsyncPipe, NgForOf, NgIf, NgTemplateOutlet } from '@angular/common';
import { MatDatepickerModule } from '@angular/material/datepicker';
import { NiFiCommon } from '../../../service/nifi-common.service';
import { MatTabsModule } from '@angular/material/tabs';
import { Attribute, ProvenanceEvent, ProvenanceEventDialogRequest } from '../../../state/shared';
import { Attribute, ProvenanceEventDialogRequest } from '../../../state/shared';
@Component({
selector: 'provenance-event-dialog',
standalone: true,
templateUrl: './provenance-event-dialog.component.html',
imports: [
ReactiveFormsModule,
MatDialogModule,
MatInputModule,
MatCheckboxModule,

View File

@ -46,14 +46,19 @@
background-color: #f4f6f7;
}
.fa,
.icon {
.fa {
color: #004849;
width: 10px;
height: 14px;
text-align: center;
}
.icon {
color: #004849;
width: 10px;
text-align: center;
}
.mat-column-moreDetails {
min-width: 30px;
}