NIFI-13068: (#8668)

- Adding support to terminate threads for Processors that are no longer running.
- Updating import for concatLatestFrom.

This closes #8668
This commit is contained in:
Matt Gilman 2024-04-19 10:21:43 -04:00 committed by GitHub
parent b81203aac2
commit 8c3c1eea31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 138 additions and 34 deletions

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { NiFiState } from '../../../../state';
import { Store } from '@ngrx/store';
import { Router } from '@angular/router';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import * as BulletinBoardActions from './bulletin-board.actions';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { NiFiState } from '../../../../state';
import { Store } from '@ngrx/store';
import * as CounterListingActions from './counter-listing.actions';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import { ErrorHelper } from '../../../../service/error-helper.service';

View File

@ -56,7 +56,8 @@ import {
stopCurrentProcessGroup,
stopVersionControlRequest,
copy,
paste
paste,
terminateThreads
} from '../state/flow/flow.actions';
import { ComponentType } from '../../../state/shared';
import {
@ -639,14 +640,21 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider {
}
},
{
condition: (selection: any) => {
// TODO - canTerminate
return false;
condition: (selection: d3.Selection<any, any, any, any>) => {
return this.canvasUtils.canTerminate(selection);
},
clazz: 'fa fa-hourglass-end',
text: 'Terminate',
action: () => {
// TODO - terminate
action: (selection: d3.Selection<any, any, any, any>) => {
const d: any = selection.datum();
this.store.dispatch(
terminateThreads({
request: {
id: d.id,
uri: d.uri
}
})
);
}
},
{

View File

@ -1611,6 +1611,30 @@ export class CanvasUtils {
return selectionSize === writableSize;
}
/**
* Determines whether the selection represents a Processor that is no longer running but
* still has active threads.
*
* @param selection
*/
public canTerminate(selection: d3.Selection<any, any, any, any>): boolean {
if (selection.size() !== 1) {
return false;
}
if (!this.canOperate(selection)) {
return false;
}
if (this.isProcessor(selection)) {
const selectionData = selection.datum();
const aggregateSnapshot = selectionData.status.aggregateSnapshot;
return aggregateSnapshot.runStatus !== 'Running' && aggregateSnapshot.activeThreadCount > 0;
}
return false;
}
/**
* Determines whether the current selection supports starting flow versioning.
*

View File

@ -40,6 +40,7 @@ import {
StopComponentRequest,
StopProcessGroupRequest,
StopVersionControlRequest,
TerminateThreadsRequest,
UpdateComponentRequest,
UploadProcessGroupRequest,
VersionControlInformationEntity
@ -287,6 +288,10 @@ export class FlowService implements PropertyDescriptorRetriever {
return this.httpClient.put(`${this.nifiCommon.stripProtocol(request.uri)}/run-status`, stopRequest);
}
terminateThreads(request: TerminateThreadsRequest): Observable<any> {
return this.httpClient.delete(`${this.nifiCommon.stripProtocol(request.uri)}/threads`);
}
startProcessGroup(request: StartProcessGroupRequest): Observable<any> {
const startRequest: ProcessGroupRunStatusRequest = {
id: request.id,

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ControllerServicesActions from './controller-services.actions';
import { catchError, combineLatest, from, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -88,6 +88,7 @@ import {
StopProcessGroupResponse,
StopVersionControlRequest,
StopVersionControlResponse,
TerminateThreadsRequest,
UpdateComponentFailure,
UpdateComponentRequest,
UpdateComponentResponse,
@ -616,6 +617,11 @@ export const stopComponents = createAction(
props<{ request: StopComponentsRequest }>()
);
export const terminateThreads = createAction(
`${CANVAS_PREFIX} Terminate Threads`,
props<{ request: TerminateThreadsRequest }>()
);
export const stopComponentSuccess = createAction(
`${CANVAS_PREFIX} Stop Component Success`,
props<{ response: StopComponentResponse }>()

View File

@ -17,7 +17,8 @@
import { Injectable } from '@angular/core';
import { FlowService } from '../../service/flow.service';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as FlowActions from './flow.actions';
import * as StatusHistoryActions from '../../../../state/status-history/status-history.actions';
import * as ErrorActions from '../../../../state/error/error.actions';
@ -2582,6 +2583,36 @@ export class FlowEffects {
)
);
/**
* Terminates threads for the processor in the request.
*/
terminateThreads$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.terminateThreads),
map((action) => action.request),
switchMap((request) =>
from(this.flowService.terminateThreads(request)).pipe(
map((response) =>
FlowActions.updateComponentSuccess({
response: {
id: request.id,
type: ComponentType.Processor,
response
}
})
),
catchError((errorResponse: HttpErrorResponse) =>
of(
FlowActions.flowSnackbarError({
error: errorResponse.error
})
)
)
)
)
)
);
/**
* If the component stopped was the current process group, reload the flow
*/

View File

@ -724,6 +724,11 @@ export interface StopComponentsRequest {
components: StopComponentRequest[];
}
export interface TerminateThreadsRequest {
id: string;
uri: string;
}
export interface LoadChildProcessGroupRequest {
id: string;
}

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ManageRemotePortsActions from './manage-remote-ports.actions';
import { catchError, from, map, of, switchMap, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ParameterActions from './parameter.actions';
import { Store } from '@ngrx/store';
import { CanvasState } from '../index';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as QueueActions from './queue.actions';
import { Store } from '@ngrx/store';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, take, takeUntil, tap } from 'rxjs';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as TransformActions from './transform.actions';
import { map, tap } from 'rxjs';
import { Storage } from '../../../../service/storage.service';

View File

@ -64,7 +64,7 @@ import { initialState } from '../../state/flow/flow.reducer';
import { CanvasContextMenu } from '../../service/canvas-context-menu.service';
import { getStatusHistoryAndOpenDialog } from '../../../../state/status-history/status-history.actions';
import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.actions';
import { concatLatestFrom } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { selectUrl } from '../../../../state/router/router.selectors';
import { Storage } from '../../../../service/storage.service';
import {

View File

@ -46,7 +46,7 @@ import { NiFiCommon } from '../../../../service/nifi-common.service';
import { MatTableDataSource } from '@angular/material/table';
import { Sort } from '@angular/material/sort';
import { TextTip } from '../../../../ui/common/tooltips/text-tip/text-tip.component';
import { concatLatestFrom } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { loadFlowConfiguration } from '../../../../state/flow-configuration/flow-configuration.actions';
import {
selectFlowConfiguration,

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ParameterContextListingActions from './parameter-context-listing.actions';
import * as ErrorActions from '../../../../state/error/error.actions';
import {

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as LineageActions from './lineage.actions';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ProvenanceEventListingActions from './provenance-event-listing.actions';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as QueueListingActions from './queue-listing.actions';
import { Store } from '@ngrx/store';
import { CanvasState } from '../../../flow-designer/state';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as FlowAnalysisRuleActions from './flow-analysis-rules.actions';
import { catchError, from, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as GeneralActions from './general.actions';
import * as ErrorActions from '../../../../state/error/error.actions';
import { catchError, from, map, of, switchMap, tap } from 'rxjs';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ManagementControllerServicesActions from './management-controller-services.actions';
import * as ErrorActions from '../../../../state/error/error.actions';
import { catchError, from, map, of, switchMap, take, takeUntil, tap } from 'rxjs';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import { Client } from '../../../../service/client.service';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as RegistryClientsActions from './registry-clients.actions';
import { catchError, from, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ReportingTaskActions from './reporting-tasks.actions';
import { catchError, from, map, of, switchMap, take, takeUntil, tap } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { NiFiState } from '../../../../state';
import { Store } from '@ngrx/store';
import { ErrorHelper } from '../../../../service/error-helper.service';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { Store } from '@ngrx/store';
import { NiFiState } from '../../../../state';
import { ProcessGroupStatusService } from '../../service/process-group-status.service';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { NiFiState } from '../../../../state';
import { Store } from '@ngrx/store';
import { Router } from '@angular/router';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ClusterSummaryActions from './cluster-summary.actions';
import { asyncScheduler, catchError, delay, filter, from, interval, map, of, switchMap, takeUntil, tap } from 'rxjs';
import { ClusterService } from '../../service/cluster.service';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import { Store } from '@ngrx/store';
import { NiFiState } from '../index';
import * as ComponentStateActions from './component-state.actions';

View File

@ -16,7 +16,8 @@
*/
import { Injectable } from '@angular/core';
import { Actions, concatLatestFrom, createEffect, ofType } from '@ngrx/effects';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { concatLatestFrom } from '@ngrx/operators';
import * as ControllerServiceActions from './controller-service-state.actions';
import { asyncScheduler, catchError, filter, from, interval, map, of, switchMap, takeUntil, tap } from 'rxjs';
import { Store } from '@ngrx/store';