NIFI-7536: Fix to improve performance when determining the run status of processors when needing to wait for all processors to stop for updating parameter context, etc.

This commit is contained in:
Mark Payne 2020-06-12 17:32:14 -04:00
parent a1b245e051
commit c9d08a76b1
16 changed files with 560 additions and 100 deletions

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlType;
import java.util.Set;
@XmlType(name = "runStatusDetails")
public class ProcessorRunStatusDetailsDTO {
public static final String VALID = "VALID";
public static final String RUNNING = "RUNNING";
public static final String STOPPED = "STOPPED";
public static final String INVALID = "INVALID";
public static final String VALIDATING = "VALIDATING";
public static final String DISABLED = "DISABLED";
private String id;
private String name;
private String runStatus;
private int activeThreads;
private Set<String> validationErrors;
@ApiModelProperty("The ID of the processor")
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
@ApiModelProperty("The name of the processor")
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
@ApiModelProperty(
value = "The run status of the processor",
allowableValues = RUNNING + ", " + STOPPED + ", " + INVALID + ", " + VALIDATING + ", " + VALID + ", " + DISABLED
)
public String getRunStatus() {
return runStatus;
}
public void setRunStatus(final String runStatus) {
this.runStatus = runStatus;
}
@ApiModelProperty("The current number of threads that the processor is currently using")
public int getActiveThreadCount() {
return activeThreads;
}
public void setActiveThreadCount(final int activeThreads) {
this.activeThreads = activeThreads;
}
@ApiModelProperty("The processor's validation errors")
public Set<String> getValidationErrors() {
return validationErrors;
}
public void setValidationErrors(final Set<String> validationErrors) {
this.validationErrors = validationErrors;
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import javax.xml.bind.annotation.XmlType;
@XmlType(name = "processorRunStatusDetails")
public class ProcessorRunStatusDetailsEntity extends Entity {
private RevisionDTO revision;
private PermissionsDTO permissions;
private ProcessorRunStatusDetailsDTO runStatusDetails;
@ApiModelProperty("The revision for the Processor.")
public RevisionDTO getRevision() {
return revision;
}
public void setRevision(final RevisionDTO revision) {
this.revision = revision;
}
@ApiModelProperty("The permissions for the Processor.")
public PermissionsDTO getPermissions() {
return permissions;
}
public void setPermissions(final PermissionsDTO permissions) {
this.permissions = permissions;
}
@ApiModelProperty("The details of a Processor's run status")
public ProcessorRunStatusDetailsDTO getRunStatusDetails() {
return runStatusDetails;
}
public void setRunStatusDetails(final ProcessorRunStatusDetailsDTO runStatusDetails) {
this.runStatusDetails = runStatusDetails;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.List;
@XmlRootElement(name="processorsRunStatusDetails")
public class ProcessorsRunStatusDetailsEntity extends Entity {
private List<ProcessorRunStatusDetailsEntity> runStatusDetails;
public List<ProcessorRunStatusDetailsEntity> getRunStatusDetails() {
return runStatusDetails;
}
public void setRunStatusDetails(final List<ProcessorRunStatusDetailsEntity> runStatusDetails) {
this.runStatusDetails = runStatusDetails;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Set;
@XmlRootElement(name = "runStatusDetailsRequest")
public class RunStatusDetailsRequestEntity extends Entity {
private Set<String> processorIds;
@ApiModelProperty("The IDs of all processors whose run status details should be provided")
public Set<String> getProcessorIds() {
return processorIds;
}
public void setProcessorIds(final Set<String> processorIds) {
this.processorIds = processorIds;
}
}

View File

@ -55,6 +55,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointM
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorDiagnosticsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorRunStatusDetailsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorTypesEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
@ -115,6 +116,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
endpointMergers.add(new ProcessorEndpointMerger());
endpointMergers.add(new ProcessorsEndpointMerger());
endpointMergers.add(new ProcessorRunStatusDetailsEndpointMerger());
endpointMergers.add(new ConnectionEndpointMerger());
endpointMergers.add(new ConnectionsEndpointMerger());
endpointMergers.add(new PortEndpointMerger());

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.endpoints;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.PermissionsDtoMerger;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
import java.net.URI;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class ProcessorRunStatusDetailsEndpointMerger implements EndpointResponseMerger {
public static final String RUN_STATUS_DETAILS_URI = "/nifi-api/processors/run-status-details/queries";
@Override
public boolean canHandle(final URI uri, final String method) {
return "POST".equalsIgnoreCase(method) && RUN_STATUS_DETAILS_URI.equals(uri.getPath());
}
@Override
public NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
if (!canHandle(uri, method)) {
throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method);
}
final ProcessorsRunStatusDetailsEntity responseEntity = clientResponse.getClientResponse().readEntity(ProcessorsRunStatusDetailsEntity.class);
// Create mapping of Processor ID to its run status details.
final Map<String, ProcessorRunStatusDetailsEntity> runStatusDetailMap = responseEntity.getRunStatusDetails().stream()
.collect(Collectors.toMap(entity -> entity.getRunStatusDetails().getId(), entity -> entity));
for (final NodeResponse nodeResponse : successfulResponses) {
final ProcessorsRunStatusDetailsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity :
nodeResponse.getClientResponse().readEntity(ProcessorsRunStatusDetailsEntity.class);
for (final ProcessorRunStatusDetailsEntity processorEntity : nodeResponseEntity.getRunStatusDetails()) {
final String processorId = processorEntity.getRunStatusDetails().getId();
final ProcessorRunStatusDetailsEntity mergedEntity = runStatusDetailMap.computeIfAbsent(processorId, id -> new ProcessorRunStatusDetailsEntity());
merge(mergedEntity, processorEntity);
}
}
final ProcessorsRunStatusDetailsEntity mergedEntity = new ProcessorsRunStatusDetailsEntity();
mergedEntity.setRunStatusDetails(new ArrayList<>(runStatusDetailMap.values()));
return new NodeResponse(clientResponse, mergedEntity);
}
private void merge(final ProcessorRunStatusDetailsEntity target, final ProcessorRunStatusDetailsEntity additional) {
PermissionsDtoMerger.mergePermissions(target.getPermissions(), additional.getPermissions());
final ProcessorRunStatusDetailsDTO targetRunStatusDetailsDto = target.getRunStatusDetails();
final ProcessorRunStatusDetailsDTO additionalRunStatusDetailsDto = additional.getRunStatusDetails();
// If any node indicates that we do not have read permissions, null out both the name and validation errors.
if (!additional.getPermissions().getCanRead()) {
targetRunStatusDetailsDto.setName(null);
targetRunStatusDetailsDto.setValidationErrors(null);
}
targetRunStatusDetailsDto.setActiveThreadCount(targetRunStatusDetailsDto.getActiveThreadCount() + additionalRunStatusDetailsDto.getActiveThreadCount());
// if the status to merge is validating/invalid allow it to take precedence. whether the
// processor run status is disabled/stopped/running is part of the flow configuration
// and should not differ amongst nodes. however, whether a processor is validating/invalid
// can be driven by environmental conditions. this check allows any of those to
// take precedence over the configured run status.
final String additionalRunStatus = additionalRunStatusDetailsDto.getRunStatus();
if (RunStatus.Invalid.name().equals(additionalRunStatus) || RunStatus.Validating.name().equals(additionalRunStatus)) {
targetRunStatusDetailsDto.setRunStatus(additionalRunStatus);
}
final Set<String> additionalValidationErrors = additionalRunStatusDetailsDto.getValidationErrors();
if (targetRunStatusDetailsDto.getValidationErrors() != null && additionalValidationErrors != null) {
targetRunStatusDetailsDto.getValidationErrors().addAll(additionalValidationErrors);
}
}
}

View File

@ -32,17 +32,17 @@ public interface FlowFileEventRepository extends Closeable {
/**
* @param now the current time
* @return a report of processing activity since the given time
* @return a report of processing activity
*/
RepositoryStatusReport reportTransferEvents(long now);
/**
* Causes any flow file events of the given entry age in epoch milliseconds
* or older to be purged from the repository
*
* @param cutoffEpochMilliseconds cutoff
* Reports events for a given component
* @param componentId the ID of the component
* @param now the current time
* @return a report of processing activity
*/
void purgeTransferEvents(long cutoffEpochMilliseconds);
FlowFileEvent reportTransferEvents(String componentId, long now);
/**
* Causes any flow file events of the given component to be purged from the

View File

@ -19,6 +19,7 @@ package org.apache.nifi.reporting;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
public interface UserAwareEventAccess extends EventAccess {
/**
@ -31,6 +32,14 @@ public interface UserAwareEventAccess extends EventAccess {
*/
ProcessGroupStatus getGroupStatus(String groupId, NiFiUser user, int recursiveStatusDepth);
/**
* Returns the status for the processor with the given ID
*
* @param processorId the ID of the processor
* @param user the user performing the request
* @return the status for the processor with the given ID, or <code>null</code> if the processor cannot be found
*/
ProcessorStatus getProcessorStatus(String processorId, NiFiUser user);
/**
* Returns the status for the components in the specified group with the

View File

@ -51,12 +51,9 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
}
@Override
public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
// This is done so that if a processor is removed from the graph, its events
// will be removed rather than being kept in memory
for (final EventContainer container : componentEventMap.values()) {
container.purgeEvents(cutoffEpochMilliseconds);
}
public FlowFileEvent reportTransferEvents(final String componentId, final long now) {
final EventContainer container = componentEventMap.get(componentId);
return container == null ? null : container.generateReport(now);
}
@Override

View File

@ -144,6 +144,21 @@ public class StandardEventAccess implements UserAwareEventAccess {
return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
}
@Override
public ProcessorStatus getProcessorStatus(final String processorId, final NiFiUser user) {
final ProcessorNode procNode = flowController.getFlowManager().getProcessorNode(processorId);
if (procNode == null) {
return null;
}
FlowFileEvent flowFileEvent = flowFileEventRepository.reportTransferEvents(processorId, System.currentTimeMillis());
if (flowFileEvent == null) {
flowFileEvent = EmptyFlowFileEvent.INSTANCE;
}
final Predicate<Authorizable> authorizer = authorizable -> authorizable.isAuthorized(flowController.getAuthorizer(), RequestAction.READ, user);
return getProcessorStatus(flowFileEvent, procNode, authorizer);
}
/**
* Returns the status for components in the specified group. This request is
@ -659,6 +674,11 @@ public class StandardEventAccess implements UserAwareEventAccess {
}
private ProcessorStatus getProcessorStatus(final RepositoryStatusReport report, final ProcessorNode procNode, final Predicate<Authorizable> isAuthorized) {
final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
return getProcessorStatus(entry, procNode, isAuthorized);
}
private ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent, final ProcessorNode procNode, final Predicate<Authorizable> isAuthorized) {
final boolean isProcessorAuthorized = isAuthorized.evaluate(procNode);
final ProcessScheduler processScheduler = flowController.getProcessScheduler();
@ -669,37 +689,36 @@ public class StandardEventAccess implements UserAwareEventAccess {
status.setName(isProcessorAuthorized ? procNode.getName() : procNode.getIdentifier());
status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor");
final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
if (entry != null && entry != EmptyFlowFileEvent.INSTANCE) {
final int processedCount = entry.getFlowFilesOut();
final long numProcessedBytes = entry.getContentSizeOut();
if (flowFileEvent != null && flowFileEvent != EmptyFlowFileEvent.INSTANCE) {
final int processedCount = flowFileEvent.getFlowFilesOut();
final long numProcessedBytes = flowFileEvent.getContentSizeOut();
status.setOutputBytes(numProcessedBytes);
status.setOutputCount(processedCount);
final int inputCount = entry.getFlowFilesIn();
final long inputBytes = entry.getContentSizeIn();
final int inputCount = flowFileEvent.getFlowFilesIn();
final long inputBytes = flowFileEvent.getContentSizeIn();
status.setInputBytes(inputBytes);
status.setInputCount(inputCount);
final long readBytes = entry.getBytesRead();
final long readBytes = flowFileEvent.getBytesRead();
status.setBytesRead(readBytes);
final long writtenBytes = entry.getBytesWritten();
final long writtenBytes = flowFileEvent.getBytesWritten();
status.setBytesWritten(writtenBytes);
status.setProcessingNanos(entry.getProcessingNanoseconds());
status.setInvocations(entry.getInvocations());
status.setProcessingNanos(flowFileEvent.getProcessingNanoseconds());
status.setInvocations(flowFileEvent.getInvocations());
status.setAverageLineageDuration(entry.getAverageLineageMillis());
status.setAverageLineageDuration(flowFileEvent.getAverageLineageMillis());
status.setFlowFilesReceived(entry.getFlowFilesReceived());
status.setBytesReceived(entry.getBytesReceived());
status.setFlowFilesSent(entry.getFlowFilesSent());
status.setBytesSent(entry.getBytesSent());
status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
status.setFlowFilesReceived(flowFileEvent.getFlowFilesReceived());
status.setBytesReceived(flowFileEvent.getBytesReceived());
status.setFlowFilesSent(flowFileEvent.getFlowFilesSent());
status.setBytesSent(flowFileEvent.getBytesSent());
status.setFlowFilesRemoved(flowFileEvent.getFlowFilesRemoved());
if (isProcessorAuthorized) {
status.setCounters(entry.getCounters());
status.setCounters(flowFileEvent.getCounters());
}
}

View File

@ -108,6 +108,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
import org.apache.nifi.web.api.entity.RegistryEntity;
@ -600,6 +601,13 @@ public interface NiFiServiceFacade {
*/
Set<ProcessorEntity> getProcessors(String groupId, boolean includeDescendants);
/**
* Provides a ProcessorsRunStatusDetails that describes the current details of the run status for each processor whose id is provided
* @param processorIds the set of all processor IDs that should be included
* @return a ProcessorsRunStatusDetailsEntity that describes the current information about the processors' run status
*/
ProcessorsRunStatusDetailsEntity getProcessorsRunStatusDetails(Set<String> processorIds, NiFiUser user);
/**
* Verifies the specified processor can be updated.
*

View File

@ -195,6 +195,7 @@ import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
import org.apache.nifi.web.api.dto.RegistryDTO;
@ -268,6 +269,8 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
import org.apache.nifi.web.api.entity.RegistryEntity;
@ -3320,7 +3323,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities);
final ProcessorDTO processorDTO = dtoFactory.createProcessorDto(processor);
return entityFactory.createProcessorEntity(processorDTO, revision, permissions, operatePermissions, status, bulletinEntities);
}
@Override
@ -3332,6 +3336,38 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
.collect(Collectors.toSet());
}
@Override
public ProcessorsRunStatusDetailsEntity getProcessorsRunStatusDetails(final Set<String> processorIds, final NiFiUser user) {
final List<ProcessorRunStatusDetailsEntity> runStatusDetails = processorIds.stream()
.map(processorDAO::getProcessor)
.map(processor -> createRunStatusDetailsEntity(processor, user))
.collect(Collectors.toList());
final ProcessorsRunStatusDetailsEntity entity = new ProcessorsRunStatusDetailsEntity();
entity.setRunStatusDetails(runStatusDetails);
return entity;
}
private ProcessorRunStatusDetailsEntity createRunStatusDetailsEntity(final ProcessorNode processor, final NiFiUser user) {
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(processor.getIdentifier());
final ProcessorRunStatusDetailsDTO runStatusDetailsDto = dtoFactory.createProcessorRunStatusDetailsDto(processor, processorStatus);
if (!permissions.getCanRead()) {
runStatusDetailsDto.setName(null);
runStatusDetailsDto.setValidationErrors(null);
}
final ProcessorRunStatusDetailsEntity entity = new ProcessorRunStatusDetailsEntity();
entity.setPermissions(permissions);
entity.setRevision(revision);
entity.setRunStatusDetails(runStatusDetailsDto);
return entity;
}
@Override
public TemplateDTO exportTemplate(final String id) {
final Template template = templateDAO.getTemplate(id);

View File

@ -47,6 +47,8 @@ import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.RunStatusDetailsRequestEntity;
import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
@ -212,6 +214,50 @@ public class ProcessorResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/run-status-details/queries")
@ApiOperation(
value = "Submits a query to retrieve the run status details of all processors that are in the given list of Processor IDs",
response = ProcessorsRunStatusDetailsEntity.class,
authorizations = {
@Authorization(value = "Read - /processors/{uuid} for each processor whose run status information is requested")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getProcessorRunStatusDetails(
@ApiParam(value = "The request for the processors that should be included in the results")
final RunStatusDetailsRequestEntity requestEntity) {
if (requestEntity.getProcessorIds() == null) {
throw new IllegalArgumentException("List of Processor IDs must be provided");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestEntity);
}
return withWriteLock(serviceFacade,
requestEntity,
lookup -> {},
null,
providedEntity -> {
final ProcessorsRunStatusDetailsEntity entity = serviceFacade.getProcessorsRunStatusDetails(requestEntity.getProcessorIds(), NiFiUserUtils.getNiFiUser());
return generateOkResponse(entity).build();
});
}
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)

View File

@ -1284,6 +1284,32 @@ public final class DtoFactory {
return dto;
}
public ProcessorRunStatusDetailsDTO createProcessorRunStatusDetailsDto(final ProcessorNode processor, final ProcessorStatus processorStatus) {
final ProcessorRunStatusDetailsDTO dto = new ProcessorRunStatusDetailsDTO();
dto.setId(processor.getIdentifier());
dto.setName(processor.getName());
dto.setActiveThreadCount(processorStatus.getActiveThreadCount());
dto.setRunStatus(processorStatus.getRunStatus().name());
dto.setValidationErrors(convertValidationErrors(processor.getValidationErrors()));
return dto;
}
private Set<String> convertValidationErrors(final Collection<ValidationResult> validationErrors) {
if (validationErrors == null) {
return null;
}
if (validationErrors.isEmpty()) {
return Collections.emptySet();
}
final Set<String> errors = new HashSet<>(validationErrors.size());
for (final ValidationResult result : validationErrors) {
errors.add(result.toString());
}
return errors;
}
/**
* Creates a PortStatusDTO for the specified PortStatus.
*
@ -3085,7 +3111,6 @@ public final class DtoFactory {
/**
* Creates a ProcessorDTO from the specified ProcessorNode.
*
* @param node node
* @return dto
*/
@ -3130,7 +3155,7 @@ public final class DtoFactory {
}
// sort the relationships
Collections.sort(relationships, new Comparator<RelationshipDTO>() {
relationships.sort(new Comparator<RelationshipDTO>() {
@Override
public int compare(final RelationshipDTO r1, final RelationshipDTO r2) {
return Collator.getInstance(Locale.US).compare(r1.getName(), r2.getName());
@ -3144,9 +3169,10 @@ public final class DtoFactory {
dto.setSupportsParallelProcessing(!node.isTriggeredSerially());
dto.setSupportsEventDriven(node.isEventDrivenSupported());
dto.setSupportsBatching(node.isSessionBatchingSupported());
dto.setConfig(createProcessorConfigDto(node));
final ValidationStatus validationStatus = node.getValidationStatus(1, TimeUnit.MILLISECONDS);
final ValidationStatus validationStatus = node.getValidationStatus();
dto.setValidationStatus(validationStatus.name());
final Collection<ValidationResult> validationErrors = node.getValidationErrors();

View File

@ -634,22 +634,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified processor
*/
public ProcessorStatus getProcessorStatus(final String processorId) {
final ProcessGroup root = getRootGroup();
final ProcessorNode processor = root.findProcessor(processorId);
// ensure the processor was found
if (processor == null) {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
// calculate the process group status
final String groupId = processor.getProcessGroup().getIdentifier();
final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
final ProcessorStatus status = processGroupStatus.getProcessorStatus().stream().filter(processorStatus -> processorId.equals(processorStatus.getId())).findFirst().orElse(null);
final ProcessorStatus status = flowController.getEventAccess().getProcessorStatus(processorId, NiFiUserUtils.getNiFiUser());
if (status == null) {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}

View File

@ -32,16 +32,16 @@ import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.RunStatusDetailsRequestEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +53,7 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -171,13 +172,17 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment());
originalUri.getPort(), "/nifi-api/processors/run-status-details/queries", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
final RunStatusDetailsRequestEntity requestEntity = new RunStatusDetailsRequestEntity();
final Set<String> processorIds = processors.values().stream()
.map(AffectedComponentEntity::getId)
.collect(Collectors.toSet());
requestEntity.setProcessorIds(processorIds);
boolean continuePolling = true;
while (continuePolling) {
@ -185,20 +190,19 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
getClusterCoordinatorNode(), user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
}
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class);
final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors();
final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity = getResponseEntity(clusterResponse, ProcessorsRunStatusDetailsEntity.class);
if (isProcessorValidationComplete(processorEntities, processors)) {
if (isProcessorValidationComplete(runStatusDetailsEntity, processors)) {
logger.debug("All {} processors of interest now have been validated", processors.size());
return true;
}
@ -210,14 +214,16 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
return false;
}
private boolean isProcessorValidationComplete(Set<ProcessorEntity> processorEntities, Map<String, AffectedComponentEntity> affectedComponents) {
updateAffectedProcessors(processorEntities, affectedComponents);
for (final ProcessorEntity entity : processorEntities) {
if (!affectedComponents.containsKey(entity.getId())) {
private boolean isProcessorValidationComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map<String, AffectedComponentEntity> affectedComponents) {
updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents);
for (final ProcessorRunStatusDetailsEntity statusDetailsEntity : runStatusDetailsEntity.getRunStatusDetails()) {
final ProcessorRunStatusDetailsDTO runStatusDetails = statusDetailsEntity.getRunStatusDetails();
if (!affectedComponents.containsKey(runStatusDetails.getId())) {
continue;
}
if (ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
if (ProcessorRunStatusDetailsDTO.VALIDATING.equals(runStatusDetails.getRunStatus())) {
return false;
}
}
@ -241,14 +247,20 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment());
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(),
"/nifi-api/processors/run-status-details/queries", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
final Set<String> processorIds = processors.values().stream()
.map(AffectedComponentEntity::getId)
.collect(Collectors.toSet());
final RunStatusDetailsRequestEntity requestEntity = new RunStatusDetailsRequestEntity();
requestEntity.setProcessorIds(processorIds);
boolean continuePolling = true;
while (continuePolling) {
@ -256,20 +268,18 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
getClusterCoordinatorNode(), user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
}
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class);
final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors();
if (isProcessorActionComplete(processorEntities, processors, desiredState, invalidComponentAction)) {
final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity = getResponseEntity(clusterResponse, ProcessorsRunStatusDetailsEntity.class);
if (isProcessorActionComplete(runStatusDetailsEntity, processors, desiredState, invalidComponentAction)) {
logger.debug("All {} processors of interest now have the desired state of {}", processors.size(), desiredState);
return true;
}
@ -298,41 +308,44 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
return entity;
}
private void updateAffectedProcessors(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
private void updateAffectedProcessors(final Collection<ProcessorRunStatusDetailsEntity> runStatusDetailsEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
// update the affected processors
processorEntities.stream()
.filter(entity -> affectedComponents.containsKey(entity.getId()))
.forEach(entity -> {
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId());
affectedComponentEntity.setRevision(entity.getRevision());
runStatusDetailsEntities.stream()
.filter(entity -> affectedComponents.containsKey(entity.getRunStatusDetails().getId()))
.forEach(entity -> {
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getRunStatusDetails().getId());
affectedComponentEntity.setRevision(entity.getRevision());
// only consider update this component if the user had permissions to it
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails();
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
}
// only consider update this component if the user had permissions to it
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
affectedComponent.setState(runStatusDetailsDto.getRunStatus());
affectedComponent.setActiveThreadCount(runStatusDetailsDto.getActiveThreadCount());
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
affectedComponent.setValidationErrors(runStatusDetailsDto.getValidationErrors());
}
});
}
});
}
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState,
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
private boolean isProcessorActionComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map<String, AffectedComponentEntity> affectedComponents,
final ScheduledState desiredState, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
final String desiredStateName = desiredState.name();
updateAffectedProcessors(processorEntities, affectedComponents);
updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents);
for (final ProcessorEntity entity : processorEntities) {
if (!affectedComponents.containsKey(entity.getId())) {
for (final ProcessorRunStatusDetailsEntity entity : runStatusDetailsEntity.getRunStatusDetails()) {
final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails();
if (!affectedComponents.containsKey(runStatusDetailsDto.getId())) {
continue;
}
final ProcessorStatusDTO status = entity.getStatus();
if (ProcessorDTO.INVALID.equals(entity.getComponent().getValidationStatus())) {
if (ProcessorRunStatusDetailsDTO.INVALID.equals(runStatusDetailsDto.getRunStatus())) {
switch (invalidComponentAction) {
case WAIT:
return false;
@ -340,17 +353,17 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
continue;
case FAIL:
final String action = desiredState == ScheduledState.RUNNING ? "start" : "stop";
throw new LifecycleManagementException("Could not " + action + " " + entity.getComponent().getName() + " because it is invalid");
throw new LifecycleManagementException("Could not " + action + " " + runStatusDetailsDto.getName() + " because it is invalid");
}
}
final String runStatus = status.getAggregateSnapshot().getRunStatus();
final String runStatus = runStatusDetailsDto.getRunStatus();
final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
}
if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) {
if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) {
return false;
}
}