diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorRunStatusDetailsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorRunStatusDetailsDTO.java new file mode 100644 index 0000000000..862c80d82e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorRunStatusDetailsDTO.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; +import java.util.Set; + +@XmlType(name = "runStatusDetails") +public class ProcessorRunStatusDetailsDTO { + public static final String VALID = "VALID"; + public static final String RUNNING = "RUNNING"; + public static final String STOPPED = "STOPPED"; + public static final String INVALID = "INVALID"; + public static final String VALIDATING = "VALIDATING"; + public static final String DISABLED = "DISABLED"; + + + private String id; + private String name; + private String runStatus; + private int activeThreads; + private Set validationErrors; + + @ApiModelProperty("The ID of the processor") + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + @ApiModelProperty("The name of the processor") + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + @ApiModelProperty( + value = "The run status of the processor", + allowableValues = RUNNING + ", " + STOPPED + ", " + INVALID + ", " + VALIDATING + ", " + VALID + ", " + DISABLED + ) + public String getRunStatus() { + return runStatus; + } + + public void setRunStatus(final String runStatus) { + this.runStatus = runStatus; + } + + @ApiModelProperty("The current number of threads that the processor is currently using") + public int getActiveThreadCount() { + return activeThreads; + } + + public void setActiveThreadCount(final int activeThreads) { + this.activeThreads = activeThreads; + } + + + @ApiModelProperty("The processor's validation errors") + public Set getValidationErrors() { + return validationErrors; + } + + public void setValidationErrors(final Set validationErrors) { + this.validationErrors = validationErrors; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusDetailsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusDetailsEntity.java new file mode 100644 index 0000000000..438618aa19 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorRunStatusDetailsEntity.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.PermissionsDTO; +import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "processorRunStatusDetails") +public class ProcessorRunStatusDetailsEntity extends Entity { + private RevisionDTO revision; + private PermissionsDTO permissions; + private ProcessorRunStatusDetailsDTO runStatusDetails; + + @ApiModelProperty("The revision for the Processor.") + public RevisionDTO getRevision() { + return revision; + } + + public void setRevision(final RevisionDTO revision) { + this.revision = revision; + } + + @ApiModelProperty("The permissions for the Processor.") + public PermissionsDTO getPermissions() { + return permissions; + } + + public void setPermissions(final PermissionsDTO permissions) { + this.permissions = permissions; + } + + @ApiModelProperty("The details of a Processor's run status") + public ProcessorRunStatusDetailsDTO getRunStatusDetails() { + return runStatusDetails; + } + + public void setRunStatusDetails(final ProcessorRunStatusDetailsDTO runStatusDetails) { + this.runStatusDetails = runStatusDetails; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorsRunStatusDetailsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorsRunStatusDetailsEntity.java new file mode 100644 index 0000000000..76ad0df6e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorsRunStatusDetailsEntity.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.List; + +@XmlRootElement(name="processorsRunStatusDetails") +public class ProcessorsRunStatusDetailsEntity extends Entity { + private List runStatusDetails; + + public List getRunStatusDetails() { + return runStatusDetails; + } + + public void setRunStatusDetails(final List runStatusDetails) { + this.runStatusDetails = runStatusDetails; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RunStatusDetailsRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RunStatusDetailsRequestEntity.java new file mode 100644 index 0000000000..4090b23647 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RunStatusDetailsRequestEntity.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Set; + +@XmlRootElement(name = "runStatusDetailsRequest") +public class RunStatusDetailsRequestEntity extends Entity { + private Set processorIds; + + @ApiModelProperty("The IDs of all processors whose run status details should be provided") + public Set getProcessorIds() { + return processorIds; + } + + public void setProcessorIds(final Set processorIds) { + this.processorIds = processorIds; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 78cd8033f5..027ded4a11 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -55,6 +55,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointM import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorDiagnosticsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorRunStatusDetailsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorTypesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger; @@ -115,6 +116,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger()); endpointMergers.add(new ProcessorEndpointMerger()); endpointMergers.add(new ProcessorsEndpointMerger()); + endpointMergers.add(new ProcessorRunStatusDetailsEndpointMerger()); endpointMergers.add(new ConnectionEndpointMerger()); endpointMergers.add(new ConnectionsEndpointMerger()); endpointMergers.add(new PortEndpointMerger()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorRunStatusDetailsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorRunStatusDetailsEndpointMerger.java new file mode 100644 index 0000000000..153438c497 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorRunStatusDetailsEndpointMerger.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.PermissionsDtoMerger; +import org.apache.nifi.controller.status.RunStatus; +import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO; +import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; +import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class ProcessorRunStatusDetailsEndpointMerger implements EndpointResponseMerger { + public static final String RUN_STATUS_DETAILS_URI = "/nifi-api/processors/run-status-details/queries"; + + @Override + public boolean canHandle(final URI uri, final String method) { + return "POST".equalsIgnoreCase(method) && RUN_STATUS_DETAILS_URI.equals(uri.getPath()); + } + + @Override + public NodeResponse merge(final URI uri, final String method, final Set successfulResponses, final Set problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final ProcessorsRunStatusDetailsEntity responseEntity = clientResponse.getClientResponse().readEntity(ProcessorsRunStatusDetailsEntity.class); + + // Create mapping of Processor ID to its run status details. + final Map runStatusDetailMap = responseEntity.getRunStatusDetails().stream() + .collect(Collectors.toMap(entity -> entity.getRunStatusDetails().getId(), entity -> entity)); + + for (final NodeResponse nodeResponse : successfulResponses) { + final ProcessorsRunStatusDetailsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : + nodeResponse.getClientResponse().readEntity(ProcessorsRunStatusDetailsEntity.class); + + for (final ProcessorRunStatusDetailsEntity processorEntity : nodeResponseEntity.getRunStatusDetails()) { + final String processorId = processorEntity.getRunStatusDetails().getId(); + + final ProcessorRunStatusDetailsEntity mergedEntity = runStatusDetailMap.computeIfAbsent(processorId, id -> new ProcessorRunStatusDetailsEntity()); + merge(mergedEntity, processorEntity); + } + } + + final ProcessorsRunStatusDetailsEntity mergedEntity = new ProcessorsRunStatusDetailsEntity(); + mergedEntity.setRunStatusDetails(new ArrayList<>(runStatusDetailMap.values())); + return new NodeResponse(clientResponse, mergedEntity); + } + + private void merge(final ProcessorRunStatusDetailsEntity target, final ProcessorRunStatusDetailsEntity additional) { + PermissionsDtoMerger.mergePermissions(target.getPermissions(), additional.getPermissions()); + + final ProcessorRunStatusDetailsDTO targetRunStatusDetailsDto = target.getRunStatusDetails(); + final ProcessorRunStatusDetailsDTO additionalRunStatusDetailsDto = additional.getRunStatusDetails(); + + // If any node indicates that we do not have read permissions, null out both the name and validation errors. + if (!additional.getPermissions().getCanRead()) { + targetRunStatusDetailsDto.setName(null); + targetRunStatusDetailsDto.setValidationErrors(null); + } + + targetRunStatusDetailsDto.setActiveThreadCount(targetRunStatusDetailsDto.getActiveThreadCount() + additionalRunStatusDetailsDto.getActiveThreadCount()); + + // if the status to merge is validating/invalid allow it to take precedence. whether the + // processor run status is disabled/stopped/running is part of the flow configuration + // and should not differ amongst nodes. however, whether a processor is validating/invalid + // can be driven by environmental conditions. this check allows any of those to + // take precedence over the configured run status. + final String additionalRunStatus = additionalRunStatusDetailsDto.getRunStatus(); + if (RunStatus.Invalid.name().equals(additionalRunStatus) || RunStatus.Validating.name().equals(additionalRunStatus)) { + targetRunStatusDetailsDto.setRunStatus(additionalRunStatus); + } + + final Set additionalValidationErrors = additionalRunStatusDetailsDto.getValidationErrors(); + if (targetRunStatusDetailsDto.getValidationErrors() != null && additionalValidationErrors != null) { + targetRunStatusDetailsDto.getValidationErrors().addAll(additionalValidationErrors); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java index 8fac237add..be26cd86d7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java @@ -32,17 +32,17 @@ public interface FlowFileEventRepository extends Closeable { /** * @param now the current time - * @return a report of processing activity since the given time + * @return a report of processing activity */ RepositoryStatusReport reportTransferEvents(long now); /** - * Causes any flow file events of the given entry age in epoch milliseconds - * or older to be purged from the repository - * - * @param cutoffEpochMilliseconds cutoff + * Reports events for a given component + * @param componentId the ID of the component + * @param now the current time + * @return a report of processing activity */ - void purgeTransferEvents(long cutoffEpochMilliseconds); + FlowFileEvent reportTransferEvents(String componentId, long now); /** * Causes any flow file events of the given component to be purged from the diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java index 32b2e01840..0fb43e093f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java @@ -19,6 +19,7 @@ package org.apache.nifi.reporting; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.controller.repository.RepositoryStatusReport; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; public interface UserAwareEventAccess extends EventAccess { /** @@ -31,6 +32,14 @@ public interface UserAwareEventAccess extends EventAccess { */ ProcessGroupStatus getGroupStatus(String groupId, NiFiUser user, int recursiveStatusDepth); + /** + * Returns the status for the processor with the given ID + * + * @param processorId the ID of the processor + * @param user the user performing the request + * @return the status for the processor with the given ID, or null if the processor cannot be found + */ + ProcessorStatus getProcessorStatus(String processorId, NiFiUser user); /** * Returns the status for the components in the specified group with the diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java index 629ec5e2ab..774ced4131 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java @@ -51,12 +51,9 @@ public class RingBufferEventRepository implements FlowFileEventRepository { } @Override - public void purgeTransferEvents(final long cutoffEpochMilliseconds) { - // This is done so that if a processor is removed from the graph, its events - // will be removed rather than being kept in memory - for (final EventContainer container : componentEventMap.values()) { - container.purgeEvents(cutoffEpochMilliseconds); - } + public FlowFileEvent reportTransferEvents(final String componentId, final long now) { + final EventContainer container = componentEventMap.get(componentId); + return container == null ? null : container.generateReport(now); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index e30c00c49e..9e2ab49881 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -144,6 +144,21 @@ public class StandardEventAccess implements UserAwareEventAccess { return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis()); } + @Override + public ProcessorStatus getProcessorStatus(final String processorId, final NiFiUser user) { + final ProcessorNode procNode = flowController.getFlowManager().getProcessorNode(processorId); + if (procNode == null) { + return null; + } + + FlowFileEvent flowFileEvent = flowFileEventRepository.reportTransferEvents(processorId, System.currentTimeMillis()); + if (flowFileEvent == null) { + flowFileEvent = EmptyFlowFileEvent.INSTANCE; + } + + final Predicate authorizer = authorizable -> authorizable.isAuthorized(flowController.getAuthorizer(), RequestAction.READ, user); + return getProcessorStatus(flowFileEvent, procNode, authorizer); + } /** * Returns the status for components in the specified group. This request is @@ -659,6 +674,11 @@ public class StandardEventAccess implements UserAwareEventAccess { } private ProcessorStatus getProcessorStatus(final RepositoryStatusReport report, final ProcessorNode procNode, final Predicate isAuthorized) { + final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier()); + return getProcessorStatus(entry, procNode, isAuthorized); + } + + private ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent, final ProcessorNode procNode, final Predicate isAuthorized) { final boolean isProcessorAuthorized = isAuthorized.evaluate(procNode); final ProcessScheduler processScheduler = flowController.getProcessScheduler(); @@ -669,37 +689,36 @@ public class StandardEventAccess implements UserAwareEventAccess { status.setName(isProcessorAuthorized ? procNode.getName() : procNode.getIdentifier()); status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor"); - final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier()); - if (entry != null && entry != EmptyFlowFileEvent.INSTANCE) { - final int processedCount = entry.getFlowFilesOut(); - final long numProcessedBytes = entry.getContentSizeOut(); + if (flowFileEvent != null && flowFileEvent != EmptyFlowFileEvent.INSTANCE) { + final int processedCount = flowFileEvent.getFlowFilesOut(); + final long numProcessedBytes = flowFileEvent.getContentSizeOut(); status.setOutputBytes(numProcessedBytes); status.setOutputCount(processedCount); - final int inputCount = entry.getFlowFilesIn(); - final long inputBytes = entry.getContentSizeIn(); + final int inputCount = flowFileEvent.getFlowFilesIn(); + final long inputBytes = flowFileEvent.getContentSizeIn(); status.setInputBytes(inputBytes); status.setInputCount(inputCount); - final long readBytes = entry.getBytesRead(); + final long readBytes = flowFileEvent.getBytesRead(); status.setBytesRead(readBytes); - final long writtenBytes = entry.getBytesWritten(); + final long writtenBytes = flowFileEvent.getBytesWritten(); status.setBytesWritten(writtenBytes); - status.setProcessingNanos(entry.getProcessingNanoseconds()); - status.setInvocations(entry.getInvocations()); + status.setProcessingNanos(flowFileEvent.getProcessingNanoseconds()); + status.setInvocations(flowFileEvent.getInvocations()); - status.setAverageLineageDuration(entry.getAverageLineageMillis()); + status.setAverageLineageDuration(flowFileEvent.getAverageLineageMillis()); - status.setFlowFilesReceived(entry.getFlowFilesReceived()); - status.setBytesReceived(entry.getBytesReceived()); - status.setFlowFilesSent(entry.getFlowFilesSent()); - status.setBytesSent(entry.getBytesSent()); - status.setFlowFilesRemoved(entry.getFlowFilesRemoved()); + status.setFlowFilesReceived(flowFileEvent.getFlowFilesReceived()); + status.setBytesReceived(flowFileEvent.getBytesReceived()); + status.setFlowFilesSent(flowFileEvent.getFlowFilesSent()); + status.setBytesSent(flowFileEvent.getBytesSent()); + status.setFlowFilesRemoved(flowFileEvent.getFlowFilesRemoved()); if (isProcessorAuthorized) { - status.setCounters(entry.getCounters()); + status.setCounters(flowFileEvent.getCounters()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 910b4932e5..4f3ffa6d51 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -108,6 +108,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryEntity; @@ -600,6 +601,13 @@ public interface NiFiServiceFacade { */ Set getProcessors(String groupId, boolean includeDescendants); + /** + * Provides a ProcessorsRunStatusDetails that describes the current details of the run status for each processor whose id is provided + * @param processorIds the set of all processor IDs that should be included + * @return a ProcessorsRunStatusDetailsEntity that describes the current information about the processors' run status + */ + ProcessorsRunStatusDetailsEntity getProcessorsRunStatusDetails(Set processorIds, NiFiUser user); + /** * Verifies the specified processor can be updated. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index f7a0659d9c..da34a3886b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -195,6 +195,7 @@ import org.apache.nifi.web.api.dto.PreviousValueDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.PropertyHistoryDTO; import org.apache.nifi.web.api.dto.RegistryDTO; @@ -268,6 +269,8 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; +import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity; import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryEntity; @@ -3320,7 +3323,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities); + final ProcessorDTO processorDTO = dtoFactory.createProcessorDto(processor); + return entityFactory.createProcessorEntity(processorDTO, revision, permissions, operatePermissions, status, bulletinEntities); } @Override @@ -3332,6 +3336,38 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .collect(Collectors.toSet()); } + @Override + public ProcessorsRunStatusDetailsEntity getProcessorsRunStatusDetails(final Set processorIds, final NiFiUser user) { + final List runStatusDetails = processorIds.stream() + .map(processorDAO::getProcessor) + .map(processor -> createRunStatusDetailsEntity(processor, user)) + .collect(Collectors.toList()); + + final ProcessorsRunStatusDetailsEntity entity = new ProcessorsRunStatusDetailsEntity(); + entity.setRunStatusDetails(runStatusDetails); + return entity; + } + + private ProcessorRunStatusDetailsEntity createRunStatusDetailsEntity(final ProcessorNode processor, final NiFiUser user) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); + final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(processor.getIdentifier()); + final ProcessorRunStatusDetailsDTO runStatusDetailsDto = dtoFactory.createProcessorRunStatusDetailsDto(processor, processorStatus); + + if (!permissions.getCanRead()) { + runStatusDetailsDto.setName(null); + runStatusDetailsDto.setValidationErrors(null); + } + + final ProcessorRunStatusDetailsEntity entity = new ProcessorRunStatusDetailsEntity(); + entity.setPermissions(permissions); + entity.setRevision(revision); + entity.setRunStatusDetails(runStatusDetailsDto); + return entity; + } + + + @Override public TemplateDTO exportTemplate(final String id) { final Template template = templateDAO.getTemplate(id); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index d069b7f72a..5aed25f453 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -47,6 +47,8 @@ import org.apache.nifi.web.api.entity.ComponentStateEntity; import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorRunStatusEntity; +import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; +import org.apache.nifi.web.api.entity.RunStatusDetailsRequestEntity; import org.apache.nifi.web.api.entity.PropertyDescriptorEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.LongParameter; @@ -212,6 +214,50 @@ public class ProcessorResource extends ApplicationResource { return generateOkResponse(entity).build(); } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("/run-status-details/queries") + @ApiOperation( + value = "Submits a query to retrieve the run status details of all processors that are in the given list of Processor IDs", + response = ProcessorsRunStatusDetailsEntity.class, + authorizations = { + @Authorization(value = "Read - /processors/{uuid} for each processor whose run status information is requested") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getProcessorRunStatusDetails( + @ApiParam(value = "The request for the processors that should be included in the results") + final RunStatusDetailsRequestEntity requestEntity) { + + if (requestEntity.getProcessorIds() == null) { + throw new IllegalArgumentException("List of Processor IDs must be provided"); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.POST, requestEntity); + } + + return withWriteLock(serviceFacade, + requestEntity, + lookup -> {}, + null, + providedEntity -> { + final ProcessorsRunStatusDetailsEntity entity = serviceFacade.getProcessorsRunStatusDetails(requestEntity.getProcessorIds(), NiFiUserUtils.getNiFiUser()); + return generateOkResponse(entity).build(); + }); + } + + @DELETE @Consumes(MediaType.WILDCARD) @Produces(MediaType.APPLICATION_JSON) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 2fcca8b6dc..ff858fc169 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1284,6 +1284,32 @@ public final class DtoFactory { return dto; } + public ProcessorRunStatusDetailsDTO createProcessorRunStatusDetailsDto(final ProcessorNode processor, final ProcessorStatus processorStatus) { + final ProcessorRunStatusDetailsDTO dto = new ProcessorRunStatusDetailsDTO(); + dto.setId(processor.getIdentifier()); + dto.setName(processor.getName()); + dto.setActiveThreadCount(processorStatus.getActiveThreadCount()); + dto.setRunStatus(processorStatus.getRunStatus().name()); + dto.setValidationErrors(convertValidationErrors(processor.getValidationErrors())); + return dto; + } + + private Set convertValidationErrors(final Collection validationErrors) { + if (validationErrors == null) { + return null; + } + if (validationErrors.isEmpty()) { + return Collections.emptySet(); + } + + final Set errors = new HashSet<>(validationErrors.size()); + for (final ValidationResult result : validationErrors) { + errors.add(result.toString()); + } + + return errors; + } + /** * Creates a PortStatusDTO for the specified PortStatus. * @@ -3085,7 +3111,6 @@ public final class DtoFactory { /** * Creates a ProcessorDTO from the specified ProcessorNode. - * * @param node node * @return dto */ @@ -3130,7 +3155,7 @@ public final class DtoFactory { } // sort the relationships - Collections.sort(relationships, new Comparator() { + relationships.sort(new Comparator() { @Override public int compare(final RelationshipDTO r1, final RelationshipDTO r2) { return Collator.getInstance(Locale.US).compare(r1.getName(), r2.getName()); @@ -3144,9 +3169,10 @@ public final class DtoFactory { dto.setSupportsParallelProcessing(!node.isTriggeredSerially()); dto.setSupportsEventDriven(node.isEventDrivenSupported()); dto.setSupportsBatching(node.isSessionBatchingSupported()); + dto.setConfig(createProcessorConfigDto(node)); - final ValidationStatus validationStatus = node.getValidationStatus(1, TimeUnit.MILLISECONDS); + final ValidationStatus validationStatus = node.getValidationStatus(); dto.setValidationStatus(validationStatus.name()); final Collection validationErrors = node.getValidationErrors(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 3859daaa22..b5e0bd2556 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -634,22 +634,7 @@ public class ControllerFacade implements Authorizable { * @return the status for the specified processor */ public ProcessorStatus getProcessorStatus(final String processorId) { - final ProcessGroup root = getRootGroup(); - final ProcessorNode processor = root.findProcessor(processorId); - - // ensure the processor was found - if (processor == null) { - throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId)); - } - - // calculate the process group status - final String groupId = processor.getProcessGroup().getIdentifier(); - final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1); - if (processGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); - } - - final ProcessorStatus status = processGroupStatus.getProcessorStatus().stream().filter(processorStatus -> processorId.equals(processorStatus.getId())).findFirst().orElse(null); + final ProcessorStatus status = flowController.getEventAccess().getProcessorStatus(processorId, NiFiUserUtils.getNiFiUser()); if (status == null) { throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java index 2813daf7db..6c20b7efa3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -32,16 +32,16 @@ import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget; import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.DtoFactory; -import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO; import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.ComponentEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; -import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.ProcessorsEntity; +import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; +import org.apache.nifi.web.api.entity.RunStatusDetailsRequestEntity; +import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +53,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response.Status; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -171,13 +172,17 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle URI groupUri; try { groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), - originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment()); + originalUri.getPort(), "/nifi-api/processors/run-status-details/queries", null, originalUri.getFragment()); } catch (URISyntaxException e) { throw new RuntimeException(e); } final Map headers = new HashMap<>(); - final MultivaluedMap requestEntity = new MultivaluedHashMap<>(); + final RunStatusDetailsRequestEntity requestEntity = new RunStatusDetailsRequestEntity(); + final Set processorIds = processors.values().stream() + .map(AffectedComponentEntity::getId) + .collect(Collectors.toSet()); + requestEntity.setProcessorIds(processorIds); boolean continuePolling = true; while (continuePolling) { @@ -185,20 +190,19 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. final NodeResponse clusterResponse; if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse(); } else { clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + getClusterCoordinatorNode(), user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse(); } if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { return false; } - final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class); - final Set processorEntities = processorsEntity.getProcessors(); + final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity = getResponseEntity(clusterResponse, ProcessorsRunStatusDetailsEntity.class); - if (isProcessorValidationComplete(processorEntities, processors)) { + if (isProcessorValidationComplete(runStatusDetailsEntity, processors)) { logger.debug("All {} processors of interest now have been validated", processors.size()); return true; } @@ -210,14 +214,16 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle return false; } - private boolean isProcessorValidationComplete(Set processorEntities, Map affectedComponents) { - updateAffectedProcessors(processorEntities, affectedComponents); - for (final ProcessorEntity entity : processorEntities) { - if (!affectedComponents.containsKey(entity.getId())) { + private boolean isProcessorValidationComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map affectedComponents) { + updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents); + + for (final ProcessorRunStatusDetailsEntity statusDetailsEntity : runStatusDetailsEntity.getRunStatusDetails()) { + final ProcessorRunStatusDetailsDTO runStatusDetails = statusDetailsEntity.getRunStatusDetails(); + if (!affectedComponents.containsKey(runStatusDetails.getId())) { continue; } - if (ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) { + if (ProcessorRunStatusDetailsDTO.VALIDATING.equals(runStatusDetails.getRunStatus())) { return false; } } @@ -241,14 +247,20 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle URI groupUri; try { - groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), - originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment()); + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), + "/nifi-api/processors/run-status-details/queries", null, originalUri.getFragment()); } catch (URISyntaxException e) { throw new RuntimeException(e); } final Map headers = new HashMap<>(); - final MultivaluedMap requestEntity = new MultivaluedHashMap<>(); + + final Set processorIds = processors.values().stream() + .map(AffectedComponentEntity::getId) + .collect(Collectors.toSet()); + + final RunStatusDetailsRequestEntity requestEntity = new RunStatusDetailsRequestEntity(); + requestEntity.setProcessorIds(processorIds); boolean continuePolling = true; while (continuePolling) { @@ -256,20 +268,18 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. final NodeResponse clusterResponse; if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse(); } else { clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + getClusterCoordinatorNode(), user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse(); } if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { return false; } - final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class); - final Set processorEntities = processorsEntity.getProcessors(); - - if (isProcessorActionComplete(processorEntities, processors, desiredState, invalidComponentAction)) { + final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity = getResponseEntity(clusterResponse, ProcessorsRunStatusDetailsEntity.class); + if (isProcessorActionComplete(runStatusDetailsEntity, processors, desiredState, invalidComponentAction)) { logger.debug("All {} processors of interest now have the desired state of {}", processors.size(), desiredState); return true; } @@ -298,41 +308,44 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle return entity; } - private void updateAffectedProcessors(final Set processorEntities, final Map affectedComponents) { + + private void updateAffectedProcessors(final Collection runStatusDetailsEntities, final Map affectedComponents) { // update the affected processors - processorEntities.stream() - .filter(entity -> affectedComponents.containsKey(entity.getId())) - .forEach(entity -> { - final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); - affectedComponentEntity.setRevision(entity.getRevision()); + runStatusDetailsEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getRunStatusDetails().getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getRunStatusDetails().getId()); + affectedComponentEntity.setRevision(entity.getRevision()); - // only consider update this component if the user had permissions to it - if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { - final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); - affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); - affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); + final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails(); - if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { - affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); - } + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(runStatusDetailsDto.getRunStatus()); + affectedComponent.setActiveThreadCount(runStatusDetailsDto.getActiveThreadCount()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(runStatusDetailsDto.getValidationErrors()); } - }); + } + }); } - private boolean isProcessorActionComplete(final Set processorEntities, final Map affectedComponents, final ScheduledState desiredState, - final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException { + + private boolean isProcessorActionComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map affectedComponents, + final ScheduledState desiredState, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException { final String desiredStateName = desiredState.name(); - updateAffectedProcessors(processorEntities, affectedComponents); + updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents); - for (final ProcessorEntity entity : processorEntities) { - if (!affectedComponents.containsKey(entity.getId())) { + for (final ProcessorRunStatusDetailsEntity entity : runStatusDetailsEntity.getRunStatusDetails()) { + final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails(); + if (!affectedComponents.containsKey(runStatusDetailsDto.getId())) { continue; } - final ProcessorStatusDTO status = entity.getStatus(); - - if (ProcessorDTO.INVALID.equals(entity.getComponent().getValidationStatus())) { + if (ProcessorRunStatusDetailsDTO.INVALID.equals(runStatusDetailsDto.getRunStatus())) { switch (invalidComponentAction) { case WAIT: return false; @@ -340,17 +353,17 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle continue; case FAIL: final String action = desiredState == ScheduledState.RUNNING ? "start" : "stop"; - throw new LifecycleManagementException("Could not " + action + " " + entity.getComponent().getName() + " because it is invalid"); + throw new LifecycleManagementException("Could not " + action + " " + runStatusDetailsDto.getName() + " because it is invalid"); } } - final String runStatus = status.getAggregateSnapshot().getRunStatus(); + final String runStatus = runStatusDetailsDto.getRunStatus(); final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); if (!stateMatches) { return false; } - if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) { + if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) { return false; } }