NIFI-12604: Empty Queue (#8246)

* NIFI-12604:
- Empty Queue.
- Empty All Queues.

* NIFI-12611:
- Ensuring dialog is closed when an error occurs.
- Migrating from withLatestFrom to concatLatestFrom.

* NIFI-12611:
- Adding header/nav to the controller service listing page.

This closes #8246
This commit is contained in:
Matt Gilman 2024-01-17 10:24:51 -05:00 committed by GitHub
parent 1864a370bb
commit 02df7c13f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 896 additions and 263 deletions

View File

@ -16,12 +16,12 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { NiFiState } from '../../../../state';
import { Store } from '@ngrx/store';
import { Router } from '@angular/router';
import * as AccessPolicyActions from './access-policy.actions';
import { catchError, combineLatest, filter, from, map, of, switchMap, take, tap, withLatestFrom } from 'rxjs';
import { catchError, from, map, of, switchMap, take, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { AccessPolicyService } from '../../service/access-policy.service';
import { AccessPolicyEntity, ComponentResourceAction, PolicyStatus, ResourceAction } from '../shared';
@ -30,7 +30,6 @@ import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.c
import { isDefinedAndNotNull, TenantEntity } from '../../../../state/shared';
import { AddTenantToPolicyDialog } from '../../ui/common/add-tenant-to-policy-dialog/add-tenant-to-policy-dialog.component';
import { AddTenantsToPolicyRequest } from './index';
import { ComponentAccessPolicies } from '../../ui/component-access-policies/component-access-policies.component';
import { selectUserGroups, selectUsers } from '../tenants/tenants.selectors';
@Injectable()
@ -62,7 +61,7 @@ export class AccessPolicyEffects {
reloadAccessPolicy$ = createEffect(() =>
this.actions$.pipe(
ofType(AccessPolicyActions.reloadAccessPolicy),
withLatestFrom(this.store.select(selectResourceAction).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectResourceAction).pipe(isDefinedAndNotNull())),
switchMap(([action, resourceAction]) => {
return of(
AccessPolicyActions.loadAccessPolicy({
@ -137,7 +136,7 @@ export class AccessPolicyEffects {
createAccessPolicy$ = createEffect(() =>
this.actions$.pipe(
ofType(AccessPolicyActions.createAccessPolicy),
withLatestFrom(this.store.select(selectResourceAction).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectResourceAction).pipe(isDefinedAndNotNull())),
switchMap(([action, resourceAction]) =>
from(this.accessPoliciesService.createAccessPolicy(resourceAction)).pipe(
map((response) => {
@ -212,7 +211,7 @@ export class AccessPolicyEffects {
() =>
this.actions$.pipe(
ofType(AccessPolicyActions.openAddTenantToPolicyDialog),
withLatestFrom(this.store.select(selectAccessPolicy)),
concatLatestFrom(() => this.store.select(selectAccessPolicy)),
tap(([action, accessPolicy]) => {
const dialogReference = this.dialog.open(AddTenantToPolicyDialog, {
data: {
@ -239,7 +238,7 @@ export class AccessPolicyEffects {
this.actions$.pipe(
ofType(AccessPolicyActions.addTenantsToPolicy),
map((action) => action.request),
withLatestFrom(this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())),
switchMap(([request, accessPolicy]) => {
const users: TenantEntity[] = [...accessPolicy.component.users, ...request.users];
const userGroups: TenantEntity[] = [...accessPolicy.component.userGroups, ...request.userGroups];
@ -295,7 +294,7 @@ export class AccessPolicyEffects {
this.actions$.pipe(
ofType(AccessPolicyActions.removeTenantFromPolicy),
map((action) => action.request),
withLatestFrom(this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())),
switchMap(([request, accessPolicy]) => {
const users: TenantEntity[] = [...accessPolicy.component.users];
const userGroups: TenantEntity[] = [...accessPolicy.component.userGroups];
@ -364,10 +363,10 @@ export class AccessPolicyEffects {
deleteAccessPolicy$ = createEffect(() =>
this.actions$.pipe(
ofType(AccessPolicyActions.deleteAccessPolicy),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectResourceAction).pipe(isDefinedAndNotNull()),
this.store.select(selectAccessPolicy).pipe(isDefinedAndNotNull())
),
]),
switchMap(([action, resourceAction, accessPolicy]) =>
from(this.accessPoliciesService.deleteAccessPolicy(accessPolicy)).pipe(
map((response) => {

View File

@ -18,7 +18,7 @@
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import * as TenantsActions from './tenants.actions';
import { catchError, combineLatest, filter, from, map, of, switchMap, take, tap, withLatestFrom } from 'rxjs';
import { catchError, combineLatest, map, of, switchMap } from 'rxjs';
import { AccessPolicyService } from '../../service/access-policy.service';
@Injectable()

View File

@ -16,12 +16,12 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import { Router } from '@angular/router';
import * as BulletinBoardActions from './bulletin-board.actions';
import { asyncScheduler, from, interval, map, of, switchMap, takeUntil, withLatestFrom } from 'rxjs';
import { asyncScheduler, from, interval, map, of, switchMap, takeUntil } from 'rxjs';
import { BulletinBoardService } from '../../service/bulletin-board.service';
import { selectBulletinBoardFilter, selectLastBulletinId } from './bulletin-board.selectors';
import { LoadBulletinBoardRequest } from './index';
@ -77,7 +77,10 @@ export class BulletinBoardEffects {
takeUntil(this.actions$.pipe(ofType(BulletinBoardActions.stopBulletinBoardPolling)))
)
),
withLatestFrom(this.store.select(selectBulletinBoardFilter), this.store.select(selectLastBulletinId)),
concatLatestFrom(() => [
this.store.select(selectBulletinBoardFilter),
this.store.select(selectLastBulletinId)
]),
switchMap(([, filter, lastBulletinId]) => {
const request: LoadBulletinBoardRequest = {};
if (lastBulletinId > 0) {

View File

@ -28,6 +28,7 @@ import { canvasFeatureKey, reducers } from '../state';
import { MatDialogModule } from '@angular/material/dialog';
import { ControllerServicesEffects } from '../state/controller-services/controller-services.effects';
import { ParameterEffects } from '../state/parameter/parameter.effects';
import { QueueEffects } from '../state/queue/queue.effects';
@NgModule({
declarations: [FlowDesigner, VersionControlTip],
@ -36,7 +37,13 @@ import { ParameterEffects } from '../state/parameter/parameter.effects';
CommonModule,
FlowDesignerRoutingModule,
StoreModule.forFeature(canvasFeatureKey, reducers),
EffectsModule.forFeature(FlowEffects, TransformEffects, ControllerServicesEffects, ParameterEffects),
EffectsModule.forFeature(
FlowEffects,
TransformEffects,
ControllerServicesEffects,
ParameterEffects,
QueueEffects
),
NgOptimizedImage,
MatDialogModule
]

View File

@ -33,6 +33,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('ConnectableBehavior', () => {
let service: ConnectableBehavior;
@ -42,7 +44,8 @@ describe('ConnectableBehavior', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('DraggableBehavior', () => {
let service: DraggableBehavior;
@ -43,7 +45,8 @@ describe('DraggableBehavior', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('EditableBehaviorService', () => {
let service: EditableBehavior;
@ -42,7 +44,8 @@ describe('EditableBehaviorService', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
beforeEach(() => {

View File

@ -33,6 +33,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('QuickSelectBehavior', () => {
let service: QuickSelectBehavior;
@ -42,7 +44,8 @@ describe('QuickSelectBehavior', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -32,6 +32,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('SelectableBehavior', () => {
let service: SelectableBehavior;
@ -41,7 +43,8 @@ describe('SelectableBehavior', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../state/parameter';
import * as fromParameter from '../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../queue/state';
import * as fromQueue from '../state/queue/queue.reducer';
describe('BirdseyeView', () => {
let service: BirdseyeView;
@ -43,7 +45,8 @@ describe('BirdseyeView', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -54,6 +54,7 @@ import {
ContextMenuDefinitionProvider,
ContextMenuItemDefinition
} from '../../../ui/common/context-menu/context-menu.component';
import { promptEmptyQueueRequest, promptEmptyQueuesRequest } from '../state/queue/queue.actions';
@Injectable({ providedIn: 'root' })
export class CanvasContextMenu implements ContextMenuDefinitionProvider {
@ -1045,33 +1046,44 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider {
},
{
condition: (selection: any) => {
// TODO - canEmptyQueue
return false;
return this.canvasUtils.isConnection(selection);
},
clazz: 'fa fa-minus-circle',
text: 'Empty queue',
action: () => {
// TODO - emptyQueue
action: (selection: any) => {
const selectionData = selection.datum();
this.store.dispatch(
promptEmptyQueueRequest({
request: {
connectionId: selectionData.id
}
})
);
}
},
{
condition: (selection: any) => {
return this.canvasUtils.isProcessGroup(selection);
return selection.empty() || this.canvasUtils.isProcessGroup(selection);
},
clazz: 'fa fa-minus-circle',
text: 'Empty all queues',
action: () => {
// TODO - emptyAllQueues in selected PG
}
},
{
condition: (selection: any) => {
return this.canvasUtils.emptySelection(selection);
},
clazz: 'fa fa-minus-circle',
text: 'Empty all queues',
action: () => {
// TODO - emptyAllQueues in current PG
action: (selection: any) => {
let processGroupId: string;
if (selection.empty()) {
processGroupId = this.canvasUtils.getProcessGroupId();
} else {
const selectionData = selection.datum();
processGroupId = selectionData.id;
}
this.store.dispatch(
promptEmptyQueuesRequest({
request: {
processGroupId
}
})
);
}
},
{

View File

@ -33,6 +33,8 @@ import { parameterFeatureKey } from '../state/parameter';
import * as fromParameter from '../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../queue/state';
import * as fromQueue from '../state/queue/queue.reducer';
describe('CanvasUtils', () => {
let service: CanvasUtils;
@ -42,7 +44,8 @@ describe('CanvasUtils', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../state/parameter';
import * as fromParameter from '../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../queue/state';
import * as fromQueue from '../state/queue/queue.reducer';
describe('CanvasView', () => {
let service: CanvasView;
@ -43,7 +45,8 @@ describe('CanvasView', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('ConnectionManager', () => {
let service: ConnectionManager;
@ -43,7 +45,8 @@ describe('ConnectionManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('FunnelManager', () => {
let service: FunnelManager;
@ -43,7 +45,8 @@ describe('FunnelManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('LabelManager', () => {
let service: LabelManager;
@ -43,7 +45,8 @@ describe('LabelManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('PortManager', () => {
let service: PortManager;
@ -43,7 +45,8 @@ describe('PortManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('ProcessGroupManager', () => {
let service: ProcessGroupManager;
@ -43,7 +45,8 @@ describe('ProcessGroupManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('ProcessorManager', () => {
let service: ProcessorManager;
@ -43,7 +45,8 @@ describe('ProcessorManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -34,6 +34,8 @@ import { parameterFeatureKey } from '../../state/parameter';
import * as fromParameter from '../../state/parameter/parameter.reducer';
import { selectFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.selectors';
import * as fromFlowConfiguration from '../../../../state/flow-configuration/flow-configuration.reducer';
import { queueFeatureKey } from '../../../queue/state';
import * as fromQueue from '../../state/queue/queue.reducer';
describe('RemoteProcessGroupManager', () => {
let service: RemoteProcessGroupManager;
@ -43,7 +45,8 @@ describe('RemoteProcessGroupManager', () => {
[flowFeatureKey]: fromFlow.initialState,
[transformFeatureKey]: fromTransform.initialState,
[controllerServicesFeatureKey]: fromControllerServices.initialState,
[parameterFeatureKey]: fromParameter.initialState
[parameterFeatureKey]: fromParameter.initialState,
[queueFeatureKey]: fromQueue.initialState
};
TestBed.configureTestingModule({

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
import { NiFiCommon } from '../../../service/nifi-common.service';
import { DropRequest, SubmitEmptyQueueRequest, SubmitEmptyQueuesRequest } from '../state/queue';
@Injectable({ providedIn: 'root' })
export class QueueService {
private static readonly API: string = '../nifi-api';
constructor(
private httpClient: HttpClient,
private nifiCommon: NiFiCommon
) {}
/**
* The NiFi model contain the url for each component. That URL is an absolute URL. Angular CSRF handling
* does not work on absolute URLs, so we need to strip off the proto for the request header to be added.
*
* https://stackoverflow.com/a/59586462
*
* @param url
* @private
*/
private stripProtocol(url: string): string {
return this.nifiCommon.substringAfterFirst(url, ':');
}
submitEmptyQueueRequest(emptyQueueRequest: SubmitEmptyQueueRequest): Observable<any> {
return this.httpClient.post(
`${QueueService.API}/flowfile-queues/${emptyQueueRequest.connectionId}/drop-requests`,
{}
);
}
submitEmptyQueuesRequest(emptyQueuesRequest: SubmitEmptyQueuesRequest): Observable<any> {
return this.httpClient.post(
`${QueueService.API}/process-groups/${emptyQueuesRequest.processGroupId}/empty-all-connections-requests`,
{}
);
}
pollEmptyQueueRequest(dropRequest: DropRequest): Observable<any> {
return this.httpClient.get(this.stripProtocol(dropRequest.uri));
}
deleteEmptyQueueRequest(dropRequest: DropRequest): Observable<any> {
return this.httpClient.delete(this.stripProtocol(dropRequest.uri));
}
}

View File

@ -16,7 +16,7 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ControllerServicesActions from './controller-services.actions';
import {
catchError,
@ -30,8 +30,7 @@ import {
switchMap,
take,
takeUntil,
tap,
withLatestFrom
tap
} from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
@ -119,10 +118,10 @@ export class ControllerServicesEffects {
() =>
this.actions$.pipe(
ofType(ControllerServicesActions.openNewControllerServiceDialog),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectControllerServiceTypes),
this.store.select(selectCurrentProcessGroupId)
),
]),
tap(([action, controllerServiceTypes, processGroupId]) => {
const dialogReference = this.dialog.open(CreateControllerService, {
data: {
@ -196,7 +195,7 @@ export class ControllerServicesEffects {
this.actions$.pipe(
ofType(ControllerServicesActions.navigateToEditService),
map((action) => action.id),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([id, processGroupId]) => {
this.router.navigate(['/process-groups', processGroupId, 'controller-services', id, 'edit']);
})
@ -209,10 +208,10 @@ export class ControllerServicesEffects {
this.actions$.pipe(
ofType(ControllerServicesActions.openConfigureControllerServiceDialog),
map((action) => action.request),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectCurrentParameterContext),
this.store.select(selectCurrentProcessGroupId)
),
]),
tap(([request, parameterContext, processGroupId]) => {
const serviceId: string = request.id;
@ -555,7 +554,7 @@ export class ControllerServicesEffects {
this.actions$.pipe(
ofType(ControllerServicesActions.openEnableControllerServiceDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([request, currentProcessGroupId]) => {
const serviceId: string = request.id;
@ -595,7 +594,7 @@ export class ControllerServicesEffects {
this.actions$.pipe(
ofType(ControllerServicesActions.openDisableControllerServiceDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([request, currentProcessGroupId]) => {
const serviceId: string = request.id;

View File

@ -17,7 +17,7 @@
import { Injectable } from '@angular/core';
import { FlowService } from '../../service/flow.service';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as FlowActions from './flow.actions';
import * as ParameterActions from '../parameter/parameter.actions';
import * as StatusHistoryActions from '../../../../state/status-history/status-history.actions';
@ -36,8 +36,7 @@ import {
switchMap,
take,
takeUntil,
tap,
withLatestFrom
tap
} from 'rxjs';
import {
CreateProcessGroupDialogRequest,
@ -124,7 +123,7 @@ export class FlowEffects {
reloadFlow$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.reloadFlow),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([action, processGroupId]) => {
return of(
FlowActions.loadProcessGroup({
@ -214,7 +213,7 @@ export class FlowEffects {
return of(FlowActions.openNewProcessorDialog({ request }));
case ComponentType.ProcessGroup:
return from(this.flowService.getParameterContexts()).pipe(
withLatestFrom(this.store.select(selectCurrentParameterContext)),
concatLatestFrom(() => this.store.select(selectCurrentParameterContext)),
map(([response, parameterContext]) => {
const dialogRequest: CreateProcessGroupDialogRequest = {
request,
@ -248,7 +247,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.openNewProcessorDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectProcessorTypes)),
concatLatestFrom(() => this.store.select(selectProcessorTypes)),
tap(([request, processorTypes]) => {
this.dialog
.open(CreateProcessor, {
@ -271,7 +270,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.createProcessor),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createProcessor(processGroupId, request)).pipe(
map((response) =>
@ -312,7 +311,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.createProcessGroup),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createProcessGroup(processGroupId, request)).pipe(
map((response) =>
@ -333,7 +332,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.uploadProcessGroup),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.uploadProcessGroup(processGroupId, request)).pipe(
map((response) =>
@ -354,10 +353,10 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.getParameterContextsAndOpenGroupComponentsDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, currentProcessGroupId]) =>
from(this.flowService.getParameterContexts()).pipe(
withLatestFrom(this.store.select(selectCurrentParameterContext)),
concatLatestFrom(() => this.store.select(selectCurrentParameterContext)),
map(([response, parameterContext]) => {
const dialogRequest: GroupComponentsDialogRequest = {
request,
@ -402,7 +401,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.groupComponents),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createProcessGroup(processGroupId, request)).pipe(
map((response) =>
@ -446,7 +445,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.getDefaultsAndOpenNewConnectionDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, currentProcessGroupId]) =>
from(this.flowService.getProcessGroup(currentProcessGroupId)).pipe(
map((response) =>
@ -505,7 +504,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.createConnection),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createConnection(processGroupId, request)).pipe(
map((response) =>
@ -547,7 +546,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.createPort),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createPort(processGroupId, request)).pipe(
map((response) =>
@ -568,7 +567,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.createFunnel),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createFunnel(processGroupId, request)).pipe(
map((response) =>
@ -589,7 +588,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.createLabel),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) =>
from(this.flowService.createLabel(processGroupId, request)).pipe(
map((response) =>
@ -653,7 +652,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.navigateToEditComponent),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([request, processGroupId]) => {
this.router.navigate(['/process-groups', processGroupId, request.type, request.id, 'edit']);
})
@ -665,7 +664,7 @@ export class FlowEffects {
() =>
this.actions$.pipe(
ofType(FlowActions.navigateToEditCurrentProcessGroup),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([action, processGroupId]) => {
this.router.navigate(['/process-groups', processGroupId, 'edit']);
})
@ -702,7 +701,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.navigateToViewStatusHistoryForComponent),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([request, currentProcessGroupId]) => {
this.router.navigate([
'/process-groups',
@ -838,10 +837,10 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.openEditProcessorDialog),
map((action) => action.request),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectCurrentParameterContext),
this.store.select(selectCurrentProcessGroupId)
),
]),
tap(([request, parameterContext, processGroupId]) => {
const processorId: string = request.entity.id;
@ -1190,7 +1189,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.openEditProcessGroupDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, currentProcessGroupId]) =>
this.flowService.getParameterContexts().pipe(
take(1),
@ -1616,7 +1615,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.moveComponents),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
mergeMap(([request, processGroupId]) => {
const components: any[] = request.components;
@ -1690,7 +1689,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.deleteComponents),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
mergeMap(([requests, processGroupId]) => {
if (requests.length === 1) {
return from(this.flowService.deleteComponent(requests[0])).pipe(
@ -1828,7 +1827,7 @@ export class FlowEffects {
() =>
this.actions$.pipe(
ofType(FlowActions.leaveProcessGroup),
withLatestFrom(this.store.select(selectParentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectParentProcessGroupId)),
filter(([action, parentProcessGroupId]) => parentProcessGroupId != null),
tap(([action, parentProcessGroupId]) => {
this.router.navigate(['/process-groups', parentProcessGroupId]);
@ -1841,10 +1840,10 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.addSelectedComponents),
map((action) => action.request),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectCurrentProcessGroupId),
this.store.select(selectAnySelectedComponentIds)
),
]),
switchMap(([request, processGroupId, selected]) => {
let commands: string[] = [];
if (selected.length === 0) {
@ -1873,10 +1872,10 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.removeSelectedComponents),
map((action) => action.request),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectCurrentProcessGroupId),
this.store.select(selectAnySelectedComponentIds)
),
]),
switchMap(([request, processGroupId, selected]) => {
let commands: string[];
if (selected.length === 0) {
@ -1895,7 +1894,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.selectComponents),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([request, processGroupId]) => {
let commands: string[] = [];
if (request.components.length === 1) {
@ -1917,7 +1916,7 @@ export class FlowEffects {
deselectAllComponent$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.deselectAllComponents),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([action, processGroupId]) => {
return of(FlowActions.navigateWithoutTransform({ url: ['/process-groups', processGroupId] }));
})
@ -1929,7 +1928,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.navigateToComponent),
map((action) => action.request),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([request, currentProcessGroupId]) => {
if (request.processGroupId) {
this.router.navigate(['/process-groups', request.processGroupId, request.type, request.id]);
@ -2072,7 +2071,7 @@ export class FlowEffects {
startCurrentProcessGroup$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.startCurrentProcessGroup),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([, pgId]) => {
return of(
FlowActions.startComponent({
@ -2159,7 +2158,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.startComponentSuccess),
map((action) => action.response),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
filter(([response, currentPg]) => response.component.id === currentPg),
switchMap(() => of(FlowActions.reloadFlow()))
)
@ -2172,7 +2171,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.startComponentSuccess),
map((action) => action.response),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
filter(([response]) => response.type === ComponentType.ProcessGroup),
filter(([response, currentPg]) => response.component.id !== currentPg),
switchMap(([response]) =>
@ -2190,7 +2189,7 @@ export class FlowEffects {
stopCurrentProcessGroup$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.stopCurrentProcessGroup),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
switchMap(([, pgId]) => {
return of(
FlowActions.stopComponent({
@ -2276,7 +2275,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.stopComponentSuccess),
map((action) => action.response),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
filter(([response, currentPg]) => response.component.id === currentPg),
switchMap(() => of(FlowActions.reloadFlow()))
)
@ -2289,7 +2288,7 @@ export class FlowEffects {
this.actions$.pipe(
ofType(FlowActions.stopComponentSuccess),
map((action) => action.response),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
filter(([response]) => response.type === ComponentType.ProcessGroup),
filter(([response, currentPg]) => response.component.id !== currentPg),
switchMap(([response]) =>

View File

@ -28,6 +28,9 @@ import { controllerServicesFeatureKey, ControllerServicesState } from './control
import { controllerServicesReducer } from './controller-services/controller-services.reducer';
import { parameterFeatureKey, ParameterState } from './parameter';
import { parameterReducer } from './parameter/parameter.reducer';
import { queueFeatureKey } from '../../queue/state';
import { QueueState } from './queue';
import { queueReducer } from './queue/queue.reducer';
export const canvasFeatureKey = 'canvas';
@ -36,6 +39,7 @@ export interface CanvasState {
[transformFeatureKey]: CanvasTransform;
[controllerServicesFeatureKey]: ControllerServicesState;
[parameterFeatureKey]: ParameterState;
[queueFeatureKey]: QueueState;
}
export function reducers(state: CanvasState | undefined, action: Action) {
@ -43,7 +47,8 @@ export function reducers(state: CanvasState | undefined, action: Action) {
[flowFeatureKey]: flowReducer,
[transformFeatureKey]: transformReducer,
[controllerServicesFeatureKey]: controllerServicesReducer,
[parameterFeatureKey]: parameterReducer
[parameterFeatureKey]: parameterReducer,
[queueFeatureKey]: queueReducer
})(state, action);
}

View File

@ -16,23 +16,11 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ParameterActions from './parameter.actions';
import { Store } from '@ngrx/store';
import { CanvasState } from '../index';
import {
asyncScheduler,
catchError,
from,
interval,
map,
NEVER,
of,
switchMap,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { asyncScheduler, catchError, from, interval, map, NEVER, of, switchMap, takeUntil, tap } from 'rxjs';
import { ParameterContextUpdateRequest } from '../../../../state/shared';
import { selectUpdateRequest } from './parameter.selectors';
import { ParameterService } from '../../service/parameter.service';
@ -100,7 +88,7 @@ export class ParameterEffects {
pollParameterContextUpdateRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(ParameterActions.pollParameterContextUpdateRequest),
withLatestFrom(this.store.select(selectUpdateRequest)),
concatLatestFrom(() => this.store.select(selectUpdateRequest)),
switchMap(([action, updateRequest]) => {
if (updateRequest) {
return from(this.parameterService.pollParameterContextUpdate(updateRequest.request)).pipe(
@ -151,7 +139,7 @@ export class ParameterEffects {
deleteParameterContextUpdateRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(ParameterActions.deleteParameterContextUpdateRequest),
withLatestFrom(this.store.select(selectUpdateRequest)),
concatLatestFrom(() => this.store.select(selectUpdateRequest)),
tap(([action, updateRequest]) => {
if (updateRequest) {
this.parameterService.deleteParameterContextUpdate(updateRequest.request).subscribe();

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export interface DropRequest {
id: string;
uri: string;
submissionTime: string;
lastUpdated: string;
percentCompleted: number;
finished: boolean;
failureReason: string;
currentCount: number;
currentSize: number;
current: string;
originalCount: number;
originalSize: number;
original: string;
droppedCount: number;
droppedSize: number;
dropped: string;
state: string;
}
export interface DropRequestEntity {
dropRequest: DropRequest;
}
export interface SubmitEmptyQueueRequest {
connectionId: string;
}
export interface SubmitEmptyQueuesRequest {
processGroupId: string;
}
export interface PollEmptyQueueSuccess {
dropEntity: DropRequestEntity;
}
export interface ShowEmptyQueueResults {
dropEntity: DropRequestEntity;
}
export interface QueueState {
dropEntity: DropRequestEntity | null;
connectionId: string | null;
processGroupId: string | null;
loadedTimestamp: string;
error: string | null;
status: 'pending' | 'loading' | 'error' | 'success';
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createAction, props } from '@ngrx/store';
import {
PollEmptyQueueSuccess,
ShowEmptyQueueResults,
SubmitEmptyQueueRequest,
SubmitEmptyQueuesRequest
} from './index';
const QUEUE_PREFIX = '[Queue]';
export const queueApiError = createAction(`${QUEUE_PREFIX} Queue Error`, props<{ error: string }>());
export const resetQueueState = createAction(`${QUEUE_PREFIX} Reset Queue State`);
export const promptEmptyQueueRequest = createAction(
`${QUEUE_PREFIX} Prompt Empty Queue Request`,
props<{ request: SubmitEmptyQueueRequest }>()
);
export const submitEmptyQueueRequest = createAction(
`${QUEUE_PREFIX} Submit Empty Queue Request`,
props<{ request: SubmitEmptyQueueRequest }>()
);
export const promptEmptyQueuesRequest = createAction(
`${QUEUE_PREFIX} Prompt Empty Queues Request`,
props<{ request: SubmitEmptyQueuesRequest }>()
);
export const submitEmptyQueuesRequest = createAction(
`${QUEUE_PREFIX} Submit Empty Queues Request`,
props<{ request: SubmitEmptyQueuesRequest }>()
);
export const submitEmptyQueueRequestSuccess = createAction(
`${QUEUE_PREFIX} Submit Empty Queue Request Success`,
props<{ response: PollEmptyQueueSuccess }>()
);
export const startPollingEmptyQueueRequest = createAction(`${QUEUE_PREFIX} Start Polling Empty Queue Request`);
export const pollEmptyQueueRequest = createAction(`${QUEUE_PREFIX} Poll Empty Queue Request`);
export const pollEmptyQueueRequestSuccess = createAction(
`${QUEUE_PREFIX} Poll Empty Queue Request Success`,
props<{ response: PollEmptyQueueSuccess }>()
);
export const stopPollingEmptyQueueRequest = createAction(`${QUEUE_PREFIX} Stop Polling Empty Queue Request`);
export const deleteEmptyQueueRequest = createAction(`${QUEUE_PREFIX} Delete Empty Queue Request`);
export const showEmptyQueueResults = createAction(
`${QUEUE_PREFIX} Show Empty Queue Results`,
props<{ request: ShowEmptyQueueResults }>()
);

View File

@ -0,0 +1,341 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as QueueActions from './queue.actions';
import { Store } from '@ngrx/store';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { selectDropConnectionId, selectDropProcessGroupId, selectDropRequestEntity } from './queue.selectors';
import { QueueService } from '../../service/queue.service';
import { DropRequest } from './index';
import { CancelDialog } from '../../../../ui/common/cancel-dialog/cancel-dialog.component';
import { MatDialog } from '@angular/material/dialog';
import { NiFiCommon } from '../../../../service/nifi-common.service';
import { isDefinedAndNotNull } from '../../../../state/shared';
import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.component';
import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component';
import { loadConnection, loadProcessGroup } from '../flow/flow.actions';
import { resetQueueState } from './queue.actions';
@Injectable()
export class QueueEffects {
constructor(
private actions$: Actions,
private store: Store<CanvasState>,
private queueService: QueueService,
private dialog: MatDialog,
private nifiCommon: NiFiCommon
) {}
promptEmptyQueueRequest$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueActions.promptEmptyQueueRequest),
map((action) => action.request),
tap((request) => {
const dialogReference = this.dialog.open(YesNoDialog, {
data: {
title: 'Empty Queue',
message:
'Are you sure you want to empty this queue? All FlowFiles waiting at the time of the request will be removed.'
},
panelClass: 'small-dialog'
});
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
this.store.dispatch(
QueueActions.submitEmptyQueueRequest({
request
})
);
});
})
),
{ dispatch: false }
);
submitEmptyQueueRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.submitEmptyQueueRequest),
map((action) => action.request),
switchMap((request) => {
const dialogReference = this.dialog.open(CancelDialog, {
data: {
title: 'Empty Queue',
message: 'Waiting for queue to empty...'
},
disableClose: true,
panelClass: 'small-dialog'
});
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
});
return from(this.queueService.submitEmptyQueueRequest(request)).pipe(
map((response) =>
QueueActions.submitEmptyQueueRequestSuccess({
response: {
dropEntity: response
}
})
),
catchError((error) =>
of(
QueueActions.queueApiError({
error: error.error
})
)
)
);
})
)
);
promptEmptyQueuesRequest$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueActions.promptEmptyQueuesRequest),
map((action) => action.request),
tap((request) => {
const dialogReference = this.dialog.open(YesNoDialog, {
data: {
title: 'Empty All Queues',
message:
'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.'
},
panelClass: 'small-dialog'
});
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
this.store.dispatch(
QueueActions.submitEmptyQueuesRequest({
request
})
);
});
})
),
{ dispatch: false }
);
submitEmptyQueuesRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.submitEmptyQueuesRequest),
map((action) => action.request),
switchMap((request) => {
const dialogReference = this.dialog.open(CancelDialog, {
data: {
title: 'Empty All Queues',
message: 'Waiting for all queues to empty...'
},
disableClose: true,
panelClass: 'small-dialog'
});
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
});
return from(this.queueService.submitEmptyQueuesRequest(request)).pipe(
map((response) =>
QueueActions.submitEmptyQueueRequestSuccess({
response: {
dropEntity: response
}
})
),
catchError((error) =>
of(
QueueActions.queueApiError({
error: error.error
})
)
)
);
})
)
);
submitEmptyQueueRequestSuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.submitEmptyQueueRequestSuccess),
map((action) => action.response),
switchMap((response) => {
const dropRequest: DropRequest = response.dropEntity.dropRequest;
if (dropRequest.finished) {
return of(QueueActions.deleteEmptyQueueRequest());
} else {
return of(QueueActions.startPollingEmptyQueueRequest());
}
})
)
);
startPollingEmptyQueueRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.startPollingEmptyQueueRequest),
switchMap(() =>
interval(2000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(QueueActions.stopPollingEmptyQueueRequest)))
)
),
switchMap(() => of(QueueActions.pollEmptyQueueRequest()))
)
);
pollEmptyQueueRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.pollEmptyQueueRequest),
concatLatestFrom(() => this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
switchMap(([action, dropEntity]) => {
return from(this.queueService.pollEmptyQueueRequest(dropEntity.dropRequest)).pipe(
map((response) =>
QueueActions.pollEmptyQueueRequestSuccess({
response: {
dropEntity: response
}
})
),
catchError((error) =>
of(
QueueActions.queueApiError({
error: error.error
})
)
)
);
})
)
);
pollEmptyQueueRequestSuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.pollEmptyQueueRequestSuccess),
map((action) => action.response),
filter((response) => response.dropEntity.dropRequest.finished),
switchMap((response) => of(QueueActions.stopPollingEmptyQueueRequest()))
)
);
stopPollingEmptyQueueRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.stopPollingEmptyQueueRequest),
switchMap((response) => of(QueueActions.deleteEmptyQueueRequest()))
)
);
deleteEmptyQueueRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueActions.deleteEmptyQueueRequest),
concatLatestFrom(() => this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
switchMap(([action, dropEntity]) => {
this.dialog.closeAll();
return from(this.queueService.deleteEmptyQueueRequest(dropEntity.dropRequest)).pipe(
map((response) =>
QueueActions.showEmptyQueueResults({
request: {
dropEntity: response
}
})
),
catchError((error) =>
of(
QueueActions.showEmptyQueueResults({
request: {
dropEntity
}
})
)
)
);
})
)
);
showEmptyQueueResults$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueActions.showEmptyQueueResults),
map((action) => action.request),
concatLatestFrom(() => [
this.store.select(selectDropConnectionId),
this.store.select(selectDropProcessGroupId)
]),
tap(([request, connectionId, processGroupId]) => {
const dropRequest: DropRequest = request.dropEntity.dropRequest;
const droppedTokens: string[] = dropRequest.dropped.split(/ \/ /);
let message: string = `${droppedTokens[0]} FlowFiles (${droppedTokens[1]})`;
if (dropRequest.percentCompleted < 100) {
const originalTokens: string[] = dropRequest.original.split(/ \/ /);
message = `${message} out of ${originalTokens[0]} (${originalTokens[1]})`;
}
if (connectionId) {
message = `${message} were removed from the queue.`;
this.store.dispatch(
loadConnection({
id: connectionId
})
);
} else if (processGroupId) {
message = `${message} were removed from the queues.`;
this.store.dispatch(
loadProcessGroup({
request: {
id: processGroupId,
transitionRequired: false
}
})
);
}
if (dropRequest.failureReason) {
message = `${message} ${dropRequest.failureReason}`;
}
const dialogReference = this.dialog.open(OkDialog, {
data: {
title: 'Empty Queue',
message
},
panelClass: 'small-dialog'
});
dialogReference.afterClosed().subscribe(() => {
this.store.dispatch(resetQueueState());
});
})
),
{ dispatch: false }
);
queueApiError$ = createEffect(
() =>
this.actions$.pipe(
ofType(QueueActions.queueApiError),
tap((action) => this.dialog.closeAll())
),
{ dispatch: false }
);
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createReducer, on } from '@ngrx/store';
import { QueueState } from './index';
import {
pollEmptyQueueRequestSuccess,
submitEmptyQueueRequest,
submitEmptyQueueRequestSuccess,
resetQueueState,
queueApiError,
submitEmptyQueuesRequest
} from './queue.actions';
export const initialState: QueueState = {
dropEntity: null,
processGroupId: null,
connectionId: null,
loadedTimestamp: 'N/A',
error: null,
status: 'pending'
};
export const queueReducer = createReducer(
initialState,
on(submitEmptyQueueRequest, (state, { request }) => ({
...state,
connectionId: request.connectionId,
status: 'loading' as const
})),
on(submitEmptyQueuesRequest, (state, { request }) => ({
...state,
processGroupId: request.processGroupId,
status: 'loading' as const
})),
on(submitEmptyQueueRequestSuccess, pollEmptyQueueRequestSuccess, (state, { response }) => ({
...state,
dropEntity: response.dropEntity,
loadedTimestamp: response.dropEntity.dropRequest.lastUpdated,
error: null,
status: 'success' as const
})),
on(queueApiError, (state, { error }) => ({
...state,
error,
status: 'error' as const
})),
on(resetQueueState, (state) => ({
...initialState
}))
);

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createSelector } from '@ngrx/store';
import { queueFeatureKey } from '../../../queue/state';
import { QueueState } from './index';
import { CanvasState, selectCanvasState } from '../index';
export const selectQueueState = createSelector(selectCanvasState, (state: CanvasState) => state[queueFeatureKey]);
export const selectDropRequestEntity = createSelector(selectQueueState, (state: QueueState) => state.dropEntity);
export const selectDropConnectionId = createSelector(selectQueueState, (state: QueueState) => state.connectionId);
export const selectDropProcessGroupId = createSelector(selectQueueState, (state: QueueState) => state.processGroupId);

View File

@ -16,9 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as TransformActions from './transform.actions';
import { map, tap, withLatestFrom } from 'rxjs';
import { map, tap } from 'rxjs';
import { Storage } from '../../../../service/storage.service';
import { selectCurrentProcessGroupId } from '../flow/flow.selectors';
import { Store } from '@ngrx/store';
@ -43,7 +43,7 @@ export class TransformEffects {
this.actions$.pipe(
ofType(TransformActions.transformComplete),
map((action) => action.transform),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([transform, processGroupId]) => {
const name: string = TransformEffects.VIEW_PREFIX + processGroupId;
@ -65,7 +65,7 @@ export class TransformEffects {
() =>
this.actions$.pipe(
ofType(TransformActions.restoreViewport),
withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
concatLatestFrom(() => this.store.select(selectCurrentProcessGroupId)),
tap(([action, processGroupId]) => {
try {
// see if we can restore the view position from storage

View File

@ -56,13 +56,14 @@ import {
selectSkipTransform,
selectViewStatusHistoryComponent
} from '../../state/flow/flow.selectors';
import { filter, map, switchMap, take, withLatestFrom } from 'rxjs';
import { filter, map, switchMap, take } from 'rxjs';
import { restoreViewport, zoomFit } from '../../state/transform/transform.actions';
import { ComponentType, isDefinedAndNotNull } from '../../../../state/shared';
import { initialState } from '../../state/flow/flow.reducer';
import { CanvasContextMenu } from '../../service/canvas-context-menu.service';
import { getStatusHistoryAndOpenDialog } from '../../../../state/status-history/status-history.actions';
import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.actions';
import { concatLatestFrom } from '@ngrx/effects';
@Component({
selector: 'fd-canvas',
@ -114,7 +115,7 @@ export class Canvas implements OnInit, OnDestroy {
filter((processGroupId) => processGroupId != initialState.id),
switchMap(() => this.store.select(selectProcessGroupRoute)),
filter((processGroupRoute) => processGroupRoute != null),
withLatestFrom(this.store.select(selectSkipTransform)),
concatLatestFrom(() => this.store.select(selectSkipTransform)),
takeUntilDestroyed()
)
.subscribe(([status, skipTransform]) => {
@ -132,7 +133,7 @@ export class Canvas implements OnInit, OnDestroy {
filter((processGroupId) => processGroupId != initialState.id),
switchMap(() => this.store.select(selectSingleSelectedComponent)),
filter((selectedComponent) => selectedComponent != null),
withLatestFrom(this.store.select(selectSkipTransform)),
concatLatestFrom(() => this.store.select(selectSkipTransform)),
takeUntilDestroyed()
)
.subscribe(([selectedComponent, skipTransform]) => {
@ -150,7 +151,7 @@ export class Canvas implements OnInit, OnDestroy {
filter((processGroupId) => processGroupId != initialState.id),
switchMap(() => this.store.select(selectBulkSelectedComponentIds)),
filter((ids) => ids.length > 0),
withLatestFrom(this.store.select(selectSkipTransform)),
concatLatestFrom(() => this.store.select(selectSkipTransform)),
takeUntilDestroyed()
)
.subscribe(([ids, skipTransform]) => {

View File

@ -15,14 +15,12 @@
~ limitations under the License.
-->
<div class="p-4 flex flex-col h-screen justify-between gap-y-5">
<div class="flex justify-between">
<div class="pb-5 flex flex-col h-screen justify-between gap-y-5">
<header class="nifi-header">
<navigation></navigation>
</header>
<div class="px-5 flex-1 flex flex-col">
<h3 class="text-xl bold controller-services-header">Controller Services</h3>
<button class="nifi-button" [routerLink]="['/']">
<i class="fa fa-times"></i>
</button>
</div>
<div class="flex-1 flex flex-col">
<ng-container *ngIf="serviceState$ | async; let serviceState">
<div *ngIf="isInitialLoading(serviceState); else loaded">
<ngx-skeleton-loader count="3"></ngx-skeleton-loader>

View File

@ -22,6 +22,7 @@ import { NgxSkeletonLoaderModule } from 'ngx-skeleton-loader';
import { ControllerServiceTable } from '../../../../ui/common/controller-service/controller-service-table/controller-service-table.component';
import { ControllerServicesRoutingModule } from './controller-services-routing.module';
import { Breadcrumbs } from '../common/breadcrumbs/breadcrumbs.component';
import { Navigation } from '../../../../ui/common/navigation/navigation.component';
@NgModule({
declarations: [ControllerServices],
@ -31,7 +32,8 @@ import { Breadcrumbs } from '../common/breadcrumbs/breadcrumbs.component';
NgxSkeletonLoaderModule,
ControllerServicesRoutingModule,
ControllerServiceTable,
Breadcrumbs
Breadcrumbs,
Navigation
]
})
export class ControllerServicesModule {}

View File

@ -16,7 +16,7 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ParameterContextListingActions from './parameter-context-listing.actions';
import {
asyncScheduler,
@ -30,8 +30,7 @@ import {
switchMap,
take,
takeUntil,
tap,
withLatestFrom
tap
} from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
@ -370,7 +369,7 @@ export class ParameterContextListingEffects {
pollParameterContextUpdateRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(ParameterContextListingActions.pollParameterContextUpdateRequest),
withLatestFrom(this.store.select(selectUpdateRequest)),
concatLatestFrom(() => this.store.select(selectUpdateRequest)),
switchMap(([action, updateRequest]) => {
if (updateRequest) {
return from(this.parameterContextService.pollParameterContextUpdate(updateRequest.request)).pipe(
@ -422,7 +421,7 @@ export class ParameterContextListingEffects {
() =>
this.actions$.pipe(
ofType(ParameterContextListingActions.deleteParameterContextUpdateRequest),
withLatestFrom(this.store.select(selectUpdateRequest)),
concatLatestFrom(() => this.store.select(selectUpdateRequest)),
tap(([action, updateRequest]) => {
if (updateRequest) {
this.parameterContextService.deleteParameterContextUpdate(updateRequest.request).subscribe();

View File

@ -16,23 +16,10 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as LineageActions from './lineage.actions';
import * as ProvenanceActions from '../provenance-event-listing/provenance-event-listing.actions';
import {
asyncScheduler,
catchError,
from,
interval,
map,
NEVER,
of,
switchMap,
take,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { asyncScheduler, catchError, from, interval, map, NEVER, of, switchMap, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
@ -113,7 +100,7 @@ export class LineageEffects {
pollLineageQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(LineageActions.pollLineageQuery),
withLatestFrom(this.store.select(selectLineageId), this.store.select(selectClusterNodeId)),
concatLatestFrom(() => [this.store.select(selectLineageId), this.store.select(selectClusterNodeId)]),
switchMap(([action, id, clusterNodeId]) => {
if (id) {
return from(this.provenanceService.getLineageQuery(id, clusterNodeId)).pipe(
@ -166,7 +153,7 @@ export class LineageEffects {
() =>
this.actions$.pipe(
ofType(LineageActions.deleteLineageQuery),
withLatestFrom(this.store.select(selectLineageId), this.store.select(selectClusterNodeId)),
concatLatestFrom(() => [this.store.select(selectLineageId), this.store.select(selectClusterNodeId)]),
tap(([action, id, clusterNodeId]) => {
if (id) {
this.provenanceService.deleteLineageQuery(id, clusterNodeId).subscribe();

View File

@ -16,22 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ProvenanceEventListingActions from './provenance-event-listing.actions';
import {
asyncScheduler,
catchError,
from,
interval,
map,
NEVER,
of,
switchMap,
take,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { asyncScheduler, catchError, from, interval, map, NEVER, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
@ -169,7 +156,7 @@ export class ProvenanceEventListingEffects {
pollProvenanceQuery$ = createEffect(() =>
this.actions$.pipe(
ofType(ProvenanceEventListingActions.pollProvenanceQuery),
withLatestFrom(this.store.select(selectProvenanceId), this.store.select(selectClusterNodeId)),
concatLatestFrom(() => [this.store.select(selectProvenanceId), this.store.select(selectClusterNodeId)]),
switchMap(([action, id, clusterNodeId]) => {
if (id) {
return from(this.provenanceService.getProvenanceQuery(id, clusterNodeId)).pipe(
@ -222,7 +209,7 @@ export class ProvenanceEventListingEffects {
() =>
this.actions$.pipe(
ofType(ProvenanceEventListingActions.deleteProvenanceQuery),
withLatestFrom(this.store.select(selectProvenanceId), this.store.select(selectClusterNodeId)),
concatLatestFrom(() => [this.store.select(selectProvenanceId), this.store.select(selectClusterNodeId)]),
tap(([action, id, clusterNodeId]) => {
if (id) {
this.provenanceService.deleteProvenanceQuery(id, clusterNodeId).subscribe();
@ -236,12 +223,12 @@ export class ProvenanceEventListingEffects {
() =>
this.actions$.pipe(
ofType(ProvenanceEventListingActions.openSearchDialog),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectTimeOffset),
this.store.select(selectProvenanceOptions),
this.store.select(selectProvenanceRequest),
this.store.select(selectAbout)
),
]),
tap(([request, timeOffset, options, currentRequest, about]) => {
if (about) {
const dialogReference = this.dialog.open(ProvenanceSearchDialog, {
@ -288,7 +275,7 @@ export class ProvenanceEventListingEffects {
this.actions$.pipe(
ofType(ProvenanceEventListingActions.openProvenanceEventDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectAbout)),
concatLatestFrom(() => this.store.select(selectAbout)),
tap(([request, about]) => {
this.provenanceService.getProvenanceEvent(request.id).subscribe({
next: (response) => {

View File

@ -30,57 +30,57 @@ import {
const QUEUE_PREFIX = '[Queue Listing]';
export const loadConnectionLabel = createAction(
`[${QUEUE_PREFIX}] Load Connection Label`,
`${QUEUE_PREFIX} Load Connection Label`,
props<{ request: LoadConnectionLabelRequest }>()
);
export const loadConnectionLabelSuccess = createAction(
`[${QUEUE_PREFIX}] Load Connection Label Success`,
`${QUEUE_PREFIX} Load Connection Label Success`,
props<{ response: LoadConnectionLabelResponse }>()
);
export const queueListingApiError = createAction(`[${QUEUE_PREFIX}] Queue Error`, props<{ error: string }>());
export const queueListingApiError = createAction(`${QUEUE_PREFIX} Queue Error`, props<{ error: string }>());
export const resetQueueListingState = createAction(`[${QUEUE_PREFIX}] Reset Queue Listing State`);
export const resetQueueListingState = createAction(`${QUEUE_PREFIX} Reset Queue Listing State`);
export const submitQueueListingRequest = createAction(
`[${QUEUE_PREFIX}] Submit Queue Listing Request`,
`${QUEUE_PREFIX} Submit Queue Listing Request`,
props<{ request: SubmitQueueListingRequest }>()
);
export const resubmitQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Resubmit Queue Listing Request`);
export const resubmitQueueListingRequest = createAction(`${QUEUE_PREFIX} Resubmit Queue Listing Request`);
export const submitQueueListingRequestSuccess = createAction(
`[${QUEUE_PREFIX}] Submit Queue Listing Request Success`,
`${QUEUE_PREFIX} Submit Queue Listing Request Success`,
props<{ response: PollQueueListingSuccess }>()
);
export const startPollingQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Start Polling Queue Listing Request`);
export const startPollingQueueListingRequest = createAction(`${QUEUE_PREFIX} Start Polling Queue Listing Request`);
export const pollQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Poll Queue Listing Request`);
export const pollQueueListingRequest = createAction(`${QUEUE_PREFIX} Poll Queue Listing Request`);
export const pollQueueListingRequestSuccess = createAction(
`[${QUEUE_PREFIX}] Poll Queue Listing Request Success`,
`${QUEUE_PREFIX} Poll Queue Listing Request Success`,
props<{ response: PollQueueListingSuccess }>()
);
export const stopPollingQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Stop Polling Queue Listing Request`);
export const stopPollingQueueListingRequest = createAction(`${QUEUE_PREFIX} Stop Polling Queue Listing Request`);
export const deleteQueueListingRequest = createAction(`[${QUEUE_PREFIX}] Delete Queue Listing Request`);
export const deleteQueueListingRequest = createAction(`${QUEUE_PREFIX} Delete Queue Listing Request`);
export const viewFlowFile = createAction(`[${QUEUE_PREFIX}] View FlowFile`, props<{ request: ViewFlowFileRequest }>());
export const viewFlowFile = createAction(`${QUEUE_PREFIX} View FlowFile`, props<{ request: ViewFlowFileRequest }>());
export const openFlowFileDialog = createAction(
`[${QUEUE_PREFIX}] Open FlowFile Dialog`,
`${QUEUE_PREFIX} Open FlowFile Dialog`,
props<{ request: FlowFileDialogRequest }>()
);
export const downloadFlowFileContent = createAction(
`[${QUEUE_PREFIX}] Download FlowFile Content`,
`${QUEUE_PREFIX} Download FlowFile Content`,
props<{ request: DownloadFlowFileContentRequest }>()
);
export const viewFlowFileContent = createAction(
`[${QUEUE_PREFIX}] View FlowFile Content`,
`${QUEUE_PREFIX} View FlowFile Content`,
props<{ request: ViewFlowFileContentRequest }>()
);

View File

@ -16,24 +16,11 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as QueueListingActions from './queue-listing.actions';
import { Store } from '@ngrx/store';
import { CanvasState } from '../../../flow-designer/state';
import {
asyncScheduler,
catchError,
filter,
from,
interval,
map,
of,
switchMap,
take,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { selectConnectionIdFromRoute, selectListingRequestEntity } from './queue-listing.selectors';
import { QueueService } from '../../service/queue.service';
import { ListingRequest } from './index';
@ -131,7 +118,7 @@ export class QueueListingEffects {
resubmitQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.resubmitQueueListingRequest),
withLatestFrom(this.store.select(selectConnectionIdFromRoute)),
concatLatestFrom(() => this.store.select(selectConnectionIdFromRoute)),
switchMap(([action, connectionId]) =>
of(QueueListingActions.submitQueueListingRequest({ request: { connectionId } }))
)
@ -168,7 +155,7 @@ export class QueueListingEffects {
pollQueueListingRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(QueueListingActions.pollQueueListingRequest),
withLatestFrom(this.store.select(selectListingRequestEntity).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectListingRequestEntity).pipe(isDefinedAndNotNull())),
switchMap(([action, requestEntity]) => {
return from(this.queueService.pollQueueListingRequest(requestEntity.listingRequest)).pipe(
map((response) =>
@ -210,7 +197,7 @@ export class QueueListingEffects {
() =>
this.actions$.pipe(
ofType(QueueListingActions.deleteQueueListingRequest),
withLatestFrom(this.store.select(selectListingRequestEntity)),
concatLatestFrom(() => this.store.select(selectListingRequestEntity)),
tap(([action, requestEntity]) => {
this.dialog.closeAll();
@ -252,7 +239,7 @@ export class QueueListingEffects {
this.actions$.pipe(
ofType(QueueListingActions.openFlowFileDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectAbout)),
concatLatestFrom(() => this.store.select(selectAbout)),
filter((about) => about != null),
tap(([request, about]) => {
const dialogReference = this.dialog.open(FlowFileDialog, {
@ -303,7 +290,7 @@ export class QueueListingEffects {
this.actions$.pipe(
ofType(QueueListingActions.viewFlowFileContent),
map((action) => action.request),
withLatestFrom(this.store.select(selectAbout).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectAbout).pipe(isDefinedAndNotNull())),
tap(([request, about]) => {
this.queueService.viewContent(request.flowfileSummary, about.contentViewerUrl);
})

View File

@ -16,9 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as FlowAnalysisRuleActions from './flow-analysis-rules.actions';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap, withLatestFrom } from 'rxjs';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
@ -87,7 +87,7 @@ export class FlowAnalysisRulesEffects {
() =>
this.actions$.pipe(
ofType(FlowAnalysisRuleActions.openNewFlowAnalysisRuleDialog),
withLatestFrom(this.store.select(selectFlowAnalysisRuleTypes)),
concatLatestFrom(() => this.store.select(selectFlowAnalysisRuleTypes)),
tap(([action, flowAnalysisRuleTypes]) => {
this.dialog.open(CreateFlowAnalysisRule, {
data: {

View File

@ -16,9 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ManagementControllerServicesActions from './management-controller-services.actions';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap, withLatestFrom } from 'rxjs';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { ManagementControllerServiceService } from '../../service/management-controller-service.service';
import { Store } from '@ngrx/store';
@ -87,7 +87,7 @@ export class ManagementControllerServicesEffects {
() =>
this.actions$.pipe(
ofType(ManagementControllerServicesActions.openNewControllerServiceDialog),
withLatestFrom(this.store.select(selectControllerServiceTypes)),
concatLatestFrom(() => this.store.select(selectControllerServiceTypes)),
tap(([action, controllerServiceTypes]) => {
const dialogReference = this.dialog.open(CreateControllerService, {
data: {

View File

@ -16,9 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as RegistryClientsActions from './registry-clients.actions';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap, withLatestFrom } from 'rxjs';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
@ -86,7 +86,7 @@ export class RegistryClientsEffects {
() =>
this.actions$.pipe(
ofType(RegistryClientsActions.openNewRegistryClientDialog),
withLatestFrom(this.store.select(selectRegistryClientTypes)),
concatLatestFrom(() => this.store.select(selectRegistryClientTypes)),
tap(([action, registryClientTypes]) => {
const dialogReference = this.dialog.open(CreateRegistryClient, {
data: {

View File

@ -16,9 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ReportingTaskActions from './reporting-tasks.actions';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap, withLatestFrom } from 'rxjs';
import { catchError, from, map, NEVER, Observable, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
@ -87,7 +87,7 @@ export class ReportingTasksEffects {
() =>
this.actions$.pipe(
ofType(ReportingTaskActions.openNewReportingTaskDialog),
withLatestFrom(this.store.select(selectReportingTaskTypes)),
concatLatestFrom(() => this.store.select(selectReportingTaskTypes)),
tap(([action, reportingTaskTypes]) => {
this.dialog.open(CreateReportingTask, {
data: {

View File

@ -16,25 +16,12 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { NiFiState } from '../../../../state';
import { Store } from '@ngrx/store';
import { Router } from '@angular/router';
import * as UserListingActions from './user-listing.actions';
import {
catchError,
combineLatest,
filter,
from,
map,
mergeMap,
of,
switchMap,
take,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { catchError, combineLatest, filter, from, map, mergeMap, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { UsersService } from '../../service/users.service';
import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.component';
@ -102,7 +89,7 @@ export class UserListingEffects {
() =>
this.actions$.pipe(
ofType(UserListingActions.openCreateTenantDialog),
withLatestFrom(this.store.select(selectUsers), this.store.select(selectUserGroups)),
concatLatestFrom(() => [this.store.select(selectUsers), this.store.select(selectUserGroups)]),
tap(([action, existingUsers, existingUserGroups]) => {
const editTenantRequest: EditTenantRequest = {
existingUsers,
@ -178,7 +165,7 @@ export class UserListingEffects {
this.actions$.pipe(
ofType(UserListingActions.createUserSuccess),
map((action) => action.response),
withLatestFrom(this.store.select(selectUserGroups)),
concatLatestFrom(() => this.store.select(selectUserGroups)),
switchMap(([response, userGroups]) => {
if (response.userGroupUpdate) {
const userGroupUpdate = response.userGroupUpdate;
@ -311,7 +298,7 @@ export class UserListingEffects {
this.actions$.pipe(
ofType(UserListingActions.openConfigureUserDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectUsers), this.store.select(selectUserGroups)),
concatLatestFrom(() => [this.store.select(selectUsers), this.store.select(selectUserGroups)]),
tap(([request, existingUsers, existingUserGroups]) => {
const editTenantRequest: EditTenantRequest = {
user: request.user,
@ -408,7 +395,7 @@ export class UserListingEffects {
this.actions$.pipe(
ofType(UserListingActions.updateUserSuccess),
map((action) => action.response),
withLatestFrom(this.store.select(selectUserGroups)),
concatLatestFrom(() => this.store.select(selectUserGroups)),
switchMap(([response, userGroups]) => {
if (response.userGroupUpdate) {
const userGroupUpdate = response.userGroupUpdate;
@ -533,7 +520,7 @@ export class UserListingEffects {
this.actions$.pipe(
ofType(UserListingActions.openConfigureUserGroupDialog),
map((action) => action.request),
withLatestFrom(this.store.select(selectUsers), this.store.select(selectUserGroups)),
concatLatestFrom(() => [this.store.select(selectUsers), this.store.select(selectUserGroups)]),
tap(([request, existingUsers, existingUserGroups]) => {
const editTenantRequest: EditTenantRequest = {
userGroup: request.userGroup,

View File

@ -16,21 +16,9 @@
*/
import { Injectable } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import * as ControllerServiceActions from './controller-service-state.actions';
import {
asyncScheduler,
catchError,
filter,
from,
interval,
map,
of,
switchMap,
takeUntil,
tap,
withLatestFrom
} from 'rxjs';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, takeUntil, tap } from 'rxjs';
import { Store } from '@ngrx/store';
import { NiFiState } from '../index';
import { selectControllerService, selectControllerServiceSetEnableRequest } from './controller-service-state.selectors';
@ -53,7 +41,7 @@ export class ControllerServiceStateEffects {
this.actions$.pipe(
ofType(ControllerServiceActions.submitEnableRequest),
map((action) => action.request),
withLatestFrom(this.store.select(selectControllerService).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectControllerService).pipe(isDefinedAndNotNull())),
switchMap(([request, controllerService]) => {
if (
request.scope === 'SERVICE_AND_REFERENCING_COMPONENTS' &&
@ -77,7 +65,7 @@ export class ControllerServiceStateEffects {
submitDisableRequest$ = createEffect(() =>
this.actions$.pipe(
ofType(ControllerServiceActions.submitDisableRequest),
withLatestFrom(this.store.select(selectControllerService).pipe(isDefinedAndNotNull())),
concatLatestFrom(() => this.store.select(selectControllerService).pipe(isDefinedAndNotNull())),
switchMap(([request, controllerService]) => {
if (this.hasUnauthorizedReferences(controllerService.component.referencingComponents)) {
return of(
@ -98,10 +86,10 @@ export class ControllerServiceStateEffects {
setEnableControllerService$ = createEffect(() =>
this.actions$.pipe(
ofType(ControllerServiceActions.setEnableControllerService),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectControllerService),
this.store.select(selectControllerServiceSetEnableRequest)
),
]),
switchMap(([request, controllerService, setEnableRequest]) => {
if (controllerService) {
return from(
@ -163,10 +151,10 @@ export class ControllerServiceStateEffects {
pollControllerService$ = createEffect(() =>
this.actions$.pipe(
ofType(ControllerServiceActions.pollControllerService),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectControllerService).pipe(isDefinedAndNotNull()),
this.store.select(selectControllerServiceSetEnableRequest)
),
]),
switchMap(([action, controllerService, setEnableRequest]) =>
from(this.controllerServiceStateService.getControllerService(controllerService.id)).pipe(
map((response) =>
@ -229,10 +217,10 @@ export class ControllerServiceStateEffects {
updateReferencingServices$ = createEffect(() =>
this.actions$.pipe(
ofType(ControllerServiceActions.updateReferencingServices),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectControllerService),
this.store.select(selectControllerServiceSetEnableRequest)
),
]),
switchMap(([action, controllerService, setEnableRequest]) => {
if (controllerService) {
return from(
@ -275,10 +263,10 @@ export class ControllerServiceStateEffects {
updateReferencingComponents$ = createEffect(() =>
this.actions$.pipe(
ofType(ControllerServiceActions.updateReferencingComponents),
withLatestFrom(
concatLatestFrom(() => [
this.store.select(selectControllerService),
this.store.select(selectControllerServiceSetEnableRequest)
),
]),
switchMap(([action, controllerService, setEnableRequest]) => {
if (controllerService) {
return from(