diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java index 7aae8665d6..45acf8e206 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java @@ -43,6 +43,8 @@ public class ProcessGroupStatus implements Cloneable { private long bytesReceived; private int flowFilesSent; private long bytesSent; + private int flowFilesTransferred; + private long bytesTransferred; private Collection connectionStatus = new ArrayList<>(); private Collection processorStatus = new ArrayList<>(); @@ -227,6 +229,22 @@ public class ProcessGroupStatus implements Cloneable { this.bytesSent = bytesSent; } + public int getFlowFilesTransferred() { + return flowFilesTransferred; + } + + public void setFlowFilesTransferred(int flowFilesTransferred) { + this.flowFilesTransferred = flowFilesTransferred; + } + + public long getBytesTransferred() { + return bytesTransferred; + } + + public void setBytesTransferred(long bytesTransferred) { + this.bytesTransferred = bytesTransferred; + } + @Override public ProcessGroupStatus clone() { @@ -248,6 +266,8 @@ public class ProcessGroupStatus implements Cloneable { clonedObj.bytesReceived = bytesReceived; clonedObj.flowFilesSent = flowFilesSent; clonedObj.bytesSent = bytesSent; + clonedObj.flowFilesTransferred = flowFilesTransferred; + clonedObj.bytesTransferred = bytesTransferred; if (connectionStatus != null) { final Collection statusList = new ArrayList<>(); @@ -317,6 +337,18 @@ public class ProcessGroupStatus implements Cloneable { builder.append(creationTimestamp); builder.append(", activeThreadCount="); builder.append(activeThreadCount); + builder.append(", flowFilesTransferred="); + builder.append(flowFilesTransferred); + builder.append(", bytesTransferred="); + builder.append(bytesTransferred); + builder.append(", flowFilesReceived="); + builder.append(flowFilesReceived); + builder.append(", bytesReceived="); + builder.append(bytesReceived); + builder.append(", flowFilesSent="); + builder.append(flowFilesSent); + builder.append(", bytesSent="); + builder.append(bytesSent); builder.append(",\n\tconnectionStatus="); for (final ConnectionStatus status : connectionStatus) { @@ -374,6 +406,12 @@ public class ProcessGroupStatus implements Cloneable { target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead()); target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten()); target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred()); + target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred()); + target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); + target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); + target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); + target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); // connection status // sort by id diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java new file mode 100644 index 0000000000..08d76a5116 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto.status; + +import java.util.Collection; +import java.util.Date; +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.web.api.dto.util.TimeAdapter; + +/** + * DTO for serializing the a process group's status across the cluster. + */ +@XmlType(name = "clusterProcessGroupStatus") +public class ClusterProcessGroupStatusDTO { + + private Collection nodeProcessGroupStatus; + private Date statsLastRefreshed; + private String processGroupId; + private String processGroupName; + + /** + * The time the status were last refreshed. + * + * @return The time the status were last refreshed + */ + @XmlJavaTypeAdapter(TimeAdapter.class) + public Date getStatsLastRefreshed() { + return statsLastRefreshed; + } + + public void setStatsLastRefreshed(Date statsLastRefreshed) { + this.statsLastRefreshed = statsLastRefreshed; + } + + /** + * The process group id. + * + * @return The process group id + */ + public String getProcessGroupId() { + return processGroupId; + } + + public void setProcessGroupId(String processGroupId) { + this.processGroupId = processGroupId; + } + + /** + * The process group name. + * + * @return The process group name + */ + public String getProcessGroupName() { + return processGroupName; + } + + public void setProcessGroupName(String processGroupName) { + this.processGroupName = processGroupName; + } + + /** + * Collection of node process group status DTO. + * + * @return The collection of node process group status DTO + */ + public Collection getNodeProcessGroupStatus() { + return nodeProcessGroupStatus; + } + + public void setNodeProcessGroupStatus(Collection nodeProcessGroupStatus) { + this.nodeProcessGroupStatus = nodeProcessGroupStatus; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java new file mode 100644 index 0000000000..5f965b2903 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto.status; + +import javax.xml.bind.annotation.XmlType; +import org.apache.nifi.web.api.dto.NodeDTO; + +/** + * DTO for serializing the process group status for a particular node. + */ +@XmlType(name = "nodeProcessGroupStatus") +public class NodeProcessGroupStatusDTO { + + private NodeDTO node; + private ProcessGroupStatusDTO processGroupStatus; + + /** + * The node. + * + * @return The node DTO + */ + public NodeDTO getNode() { + return node; + } + + public void setNode(NodeDTO node) { + this.node = node; + } + + /** + * The process group's status. + * + * @return The process group status + */ + public ProcessGroupStatusDTO getProcessGroupStatus() { + return processGroupStatus; + } + + public void setProcessGroupStatus(ProcessGroupStatusDTO processGroupStatus) { + this.processGroupStatus = processGroupStatus; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java index 2193fb0f82..7ad24a94c3 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java @@ -38,10 +38,15 @@ public class ProcessGroupStatusDTO extends StatusDTO { private Collection outputPortStatus; private String input; + private String queuedCount; + private String queuedSize; private String queued; private String read; private String written; private String output; + private String transferred; + private String received; + private String sent; private Integer activeThreadCount; private Date statsLastRefreshed; @@ -171,6 +176,74 @@ public class ProcessGroupStatusDTO extends StatusDTO { this.output = output; } + /** + * The transferred stats for this process group. This represents the + * count/size of flowfiles transferred to/from queues. + * + * @return The transferred status for this process group + */ + public String getTransferred() { + return transferred; + } + + public void setTransferred(String transferred) { + this.transferred = transferred; + } + + /** + * The received stats for this process group. This represents the count/size + * of flowfiles received. + * + * @return The received stats for this process group + */ + public String getReceived() { + return received; + } + + public void setReceived(String received) { + this.received = received; + } + + /** + * The sent stats for this process group. This represents the count/size of + * flowfiles sent. + * + * @return The sent stats for this process group + */ + public String getSent() { + return sent; + } + + public void setSent(String sent) { + this.sent = sent; + } + + /** + * The queued count for this process group. + * + * @return The queued count for this process group + */ + public String getQueuedCount() { + return queuedCount; + } + + public void setQueuedCount(String queuedCount) { + this.queuedCount = queuedCount; + } + + /** + * The queued size for this process group. + * + * @return The queued size for this process group + */ + public String getQueuedSize() { + return queuedSize; + } + + public void setQueuedSize(String queuedSize) { + this.queuedSize = queuedSize; + } + /** * The queued stats for this process group. * diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java new file mode 100644 index 0000000000..cddb21ab43 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; + +/** + * A serialized representation of this class can be placed in the entity body of + * a request or response to or from the API. This particular entity holds a + * reference to a ClusterProcessGroupStatusDTO. + */ +@XmlRootElement(name = "clusterProcessGroupStatusEntity") +public class ClusterProcessGroupStatusEntity extends Entity { + + private ClusterProcessGroupStatusDTO clusterProcessGroupStatus; + + /** + * The ClusterProcessGroupStatusDTO that is being serialized. + * + * @return The ClusterProcessGroupStatusDTO object + */ + public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus() { + return clusterProcessGroupStatus; + } + + public void setClusterProcessGroupStatus(ClusterProcessGroupStatusDTO clusterProcessGroupStatus) { + this.clusterProcessGroupStatus = clusterProcessGroupStatus; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 07e754e38e..0d7699a425 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2063,6 +2063,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R long bytesReceived = 0L; int flowFilesSent = 0; long bytesSent = 0L; + int flowFilesTransferred = 0; + long bytesTransferred = 0; // set status for processors final Collection processorStatusCollection = new ArrayList<>(); @@ -2096,6 +2098,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R bytesReceived += childGroupStatus.getBytesReceived(); flowFilesSent += childGroupStatus.getFlowFilesSent(); bytesSent += childGroupStatus.getBytesSent(); + + flowFilesTransferred += childGroupStatus.getFlowFilesTransferred(); + bytesTransferred += childGroupStatus.getBytesTransferred(); } // set status for remote child groups @@ -2133,6 +2138,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R connStatus.setInputCount(connectionStatusReport.getFlowFilesIn()); connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut()); connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut()); + + flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut()); + bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut()); } if (StringUtils.isNotBlank(conn.getName())) { @@ -2303,6 +2311,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setBytesReceived(bytesReceived); status.setFlowFilesSent(flowFilesSent); status.setBytesSent(bytesSent); + status.setFlowFilesTransferred(flowFilesTransferred); + status.setBytesTransferred(bytesTransferred); return status; } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 6cf22c00d9..c98b1e43c7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -63,6 +63,7 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; +import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; @@ -774,8 +775,7 @@ public interface NiFiServiceFacade { void verifyUpdateRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO); /** - * Verifies the specified remote process group can update the specified - * remote input port. + * Verifies the specified remote process group can update the specified remote input port. * * @param groupId The id of the parent group * @param remoteProcessGroupId The id of the remote process group @@ -784,8 +784,7 @@ public interface NiFiServiceFacade { void verifyUpdateRemoteProcessGroupInputPort(String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO); /** - * Verifies the specified remote process group can update the specified - * remote output port. + * Verifies the specified remote process group can update the specified remote output port. * * @param groupId The id of the parent group * @param remoteProcessGroupId The id of the remote process group @@ -977,13 +976,12 @@ public interface NiFiServiceFacade { * Gets the specified controller service. * * @param controllerServiceId id - * @return service + * @return service */ ControllerServiceDTO getControllerService(String controllerServiceId); /** - * Get the descriptor for the specified property of the specified controller - * service. + * Get the descriptor for the specified property of the specified controller service. * * @param id id * @param property property @@ -1037,8 +1035,7 @@ public interface NiFiServiceFacade { void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO); /** - * Verifies the referencing components of the specified controller service - * can be updated. + * Verifies the referencing components of the specified controller service can be updated. * * @param controllerServiceId id * @param scheduledState schedule state @@ -1081,8 +1078,7 @@ public interface NiFiServiceFacade { ReportingTaskDTO getReportingTask(String reportingTaskId); /** - * Get the descriptor for the specified property of the specified reporting - * task. + * Get the descriptor for the specified property of the specified reporting task. * * @param id id * @param property property @@ -1196,8 +1192,7 @@ public interface NiFiServiceFacade { void verifyUpdateSnippet(SnippetDTO snippetDto); /** - * If group id is specified, moves the specified snippet to the specified - * group. + * If group id is specified, moves the specified snippet to the specified group. * * @param revision revision * @param snippetDto snippet @@ -1256,8 +1251,7 @@ public interface NiFiServiceFacade { void invalidateUser(String userId); /** - * Invalidates the specified user accounts and all accounts associated with - * this group. + * Invalidates the specified user accounts and all accounts associated with this group. * * @param userGroup group * @param userIds id @@ -1272,8 +1266,7 @@ public interface NiFiServiceFacade { void deleteUser(String userId); /** - * Updates a user group with the specified group and comprised of the - * specified users. + * Updates a user group with the specified group and comprised of the specified users. * * @param userGroup group * @return group @@ -1298,8 +1291,7 @@ public interface NiFiServiceFacade { // Cluster methods // ---------------------------------------- /** - * @return true if controller is connected or trying to connect to the - * cluster + * @return true if controller is connected or trying to connect to the cluster */ boolean isClustered(); @@ -1369,8 +1361,7 @@ public interface NiFiServiceFacade { /** * @param processorId id - * @return the processor status history for each node connected to the - * cluster + * @return the processor status history for each node connected to the cluster */ ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId); @@ -1384,28 +1375,34 @@ public interface NiFiServiceFacade { /** * @param connectionId id - * @return the connection status history for each node connected to the - * cluster + * @return the connection status history for each node connected to the cluster */ ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId); /** * @param processGroupId id - * @return the process group status history for each node connected to the - * cluster + * @return the process group status history for each node connected to the cluster */ ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId); /** - * @param remoteProcessGroupId id - * @return the remote process group status history for each node connected - * to the cluster + * Returns a process group's status for each node connected to the cluster. + * + * @param processorId a process group identifier + * @return The cluster process group status transfer object. + */ + ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId); + + /** + * Returns the remote process group status history for each node connected to the cluster. + * + * @param remoteProcessGroupId a remote process group identifier + * @return The cluster status history */ ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId); /** - * Returns a remote process group's status for each node connected to the - * cluster. + * Returns a remote process group's status for each node connected to the cluster. * * @param remoteProcessGroupId a remote process group identifier * @return The cluster remote process group status transfer object. diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index df4cdf1d07..fbd47428c7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -163,6 +163,8 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO; import org.apache.nifi.web.dao.ControllerServiceDAO; import org.apache.nifi.web.dao.ReportingTaskDAO; import org.slf4j.Logger; @@ -2447,6 +2449,77 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return clusterConnectionStatusDto; } + private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) { + ProcessGroupStatus processGroupStatus = null; + + if (processGroupId.equals(groupStatus.getId())) { + processGroupStatus = groupStatus; + } + + if (processGroupStatus == null) { + for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { + processGroupStatus = findNodeProcessGroupStatus(status, processGroupId); + + if (processGroupStatus != null) { + break; + } + } + } + + return processGroupStatus; + } + + @Override + public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) { + + final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO(); + clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList()); + + // set the current time + clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date()); + + final Set nodes = clusterManager.getNodes(Node.Status.CONNECTED); + boolean firstNode = true; + for (final Node node : nodes) { + + final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); + if (nodeHeartbeatPayload == null) { + continue; + } + + final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); + if (nodeStats == null || nodeStats.getProcessorStatus() == null) { + continue; + } + + // attempt to find the process group stats for this node + final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId); + + // sanity check that we have status for this process group + if (processGroupStatus == null) { + throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId)); + } + + if (firstNode) { + clusterProcessGroupStatusDto.setProcessGroupId(processGroupId); + clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName()); + firstNode = false; + } + + // create node process group status dto + final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO(); + clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO); + + // populate node process group status dto + final String nodeId = node.getNodeId().getId(); + nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); + nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus)); + + } + + return clusterProcessGroupStatusDto; + } + private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) { PortStatus portStatus = null; diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java index 3a747820ec..a99d7dfd61 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java @@ -69,6 +69,8 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.security.access.prepost.PreAuthorize; import com.sun.jersey.api.core.ResourceContext; +import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; +import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity; import org.codehaus.enunciate.jaxrs.TypeHint; /** @@ -95,9 +97,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the status of this NiFi cluster. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @return A clusterStatusEntity */ @GET @@ -144,12 +144,9 @@ public class ClusterResource extends ApplicationResource { } /** - * Gets the contents of this NiFi cluster. This includes all nodes and their - * status. + * Gets the contents of this NiFi cluster. This includes all nodes and their status. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @return A clusterEntity */ @GET @@ -231,9 +228,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the processor. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the processor * @return A processorEntity */ @@ -267,11 +262,8 @@ public class ClusterResource extends ApplicationResource { * Updates the processors annotation data. * * @param httpServletRequest - * @param version The revision is used to verify the client is working with - * the latest version of the flow. - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param version The revision is used to verify the client is working with the latest version of the flow. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param processorId The id of the processor. * @param annotationData The annotation data to set. * @return A processorEntity. @@ -395,9 +387,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the processor status for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the processor * @return A clusterProcessorStatusEntity */ @@ -431,9 +421,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the processor status history for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the processor * @return A clusterProcessorStatusHistoryEntity */ @@ -466,9 +454,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the connection status for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the processor * @return A clusterProcessorStatusEntity */ @@ -502,9 +488,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the connections status history for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the processor * @return A clusterProcessorStatusHistoryEntity */ @@ -534,12 +518,44 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); } + /** + * Gets the process group status for every node. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the process group + * @return A clusterProcessGroupStatusEntity + */ + @GET + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/process-groups/{id}/status") + @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @TypeHint(ClusterConnectionStatusEntity.class) + public Response getProcessGroupStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) { + + if (properties.isClusterManager()) { + + final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id); + + // create the revision + RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create entity + final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity(); + entity.setClusterProcessGroupStatus(dto); + entity.setRevision(revision); + + // generate the response + return generateOkResponse(entity).build(); + } + + throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); + } + /** * Gets the process group status history for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the process group * @return A clusterProcessGroupStatusHistoryEntity */ @@ -572,9 +588,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the remote process group status for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the remote process group * @return A clusterRemoteProcessGroupStatusEntity */ @@ -608,9 +622,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the input port status for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the input port * @return A clusterPortStatusEntity */ @@ -644,9 +656,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the output port status for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the output port * @return A clusterPortStatusEntity */ @@ -680,9 +690,7 @@ public class ClusterResource extends ApplicationResource { /** * Gets the remote process group status history for every node. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param id The id of the processor * @return A clusterRemoteProcessGroupStatusHistoryEntity */ diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 56ee9badad..2402b73f89 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -506,13 +506,21 @@ public final class DtoFactory { processGroupStatusDto.setId(processGroupStatus.getId()); processGroupStatusDto.setName(processGroupStatus.getName()); processGroupStatusDto.setStatsLastRefreshed(new Date(processGroupStatus.getCreationTimestamp())); - processGroupStatusDto.setQueued(formatCount(processGroupStatus.getQueuedCount()) + " / " + formatDataSize(processGroupStatus.getQueuedContentSize())); processGroupStatusDto.setRead(formatDataSize(processGroupStatus.getBytesRead())); processGroupStatusDto.setWritten(formatDataSize(processGroupStatus.getBytesWritten())); processGroupStatusDto.setInput(formatCount(processGroupStatus.getInputCount()) + " / " + formatDataSize(processGroupStatus.getInputContentSize())); processGroupStatusDto.setOutput(formatCount(processGroupStatus.getOutputCount()) + " / " + formatDataSize(processGroupStatus.getOutputContentSize())); + processGroupStatusDto.setTransferred(formatCount(processGroupStatus.getFlowFilesTransferred()) + " / " + formatDataSize(processGroupStatus.getBytesTransferred())); + processGroupStatusDto.setSent(formatCount(processGroupStatus.getFlowFilesSent()) + " / " + formatDataSize(processGroupStatus.getBytesSent())); + processGroupStatusDto.setReceived(formatCount(processGroupStatus.getFlowFilesReceived()) + " / " + formatDataSize(processGroupStatus.getBytesReceived())); processGroupStatusDto.setActiveThreadCount(processGroupStatus.getActiveThreadCount()); + final String queuedCount = FormatUtils.formatCount(processGroupStatus.getQueuedCount()); + final String queuedSize = FormatUtils.formatDataSize(processGroupStatus.getQueuedContentSize()); + processGroupStatusDto.setQueuedCount(queuedCount); + processGroupStatusDto.setQueuedSize(queuedSize); + processGroupStatusDto.setQueued(queuedCount + " / " + queuedSize); + final Map componentStatusDtoMap = new HashMap<>(); // processor status @@ -1504,8 +1512,7 @@ public final class DtoFactory { } /** - * Creates a ProvenanceEventNodeDTO for the specified - * ProvenanceEventLineageNode. + * Creates a ProvenanceEventNodeDTO for the specified ProvenanceEventLineageNode. * * @param node * @return @@ -2158,9 +2165,8 @@ public final class DtoFactory { /** * * @param original - * @param deep if true, all Connections, ProcessGroups, Ports, - * Processors, etc. will be copied. If false, the copy will - * have links to the same objects referenced by original. + * @param deep if true, all Connections, ProcessGroups, Ports, Processors, etc. will be copied. If false, the copy will have links to the same objects referenced by + * original. * * @return */ diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp index 032509bd47..e6f330582d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp @@ -74,6 +74,7 @@ +
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp new file mode 100644 index 0000000000..94526d0282 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp @@ -0,0 +1,36 @@ +<%-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--%> +<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %> +
+
+
+
+
+ Last updated:  +
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp index a419baa15f..5be3e2bab0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp @@ -51,6 +51,9 @@
+
+
+
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css index bb7b9c46b2..e882f8957b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + /* Status Styles */ @@ -386,6 +387,12 @@ span.sorted { text-decoration: underline; } +/* tooltips in the summary table */ + +#summary .nifi-tooltip { + max-width: 500px; +} + /* cluster processor summary table */ #cluster-processor-summary-dialog { @@ -765,4 +772,80 @@ span.sorted { white-space: nowrap; overflow: hidden; width: 200px; +} + +/* cluster process group summary table */ + +#cluster-process-group-summary-dialog { + display: none; + width: 778px; + height: 450px; + z-index: 1301; +} + +#cluster-process-group-summary-table { + width: 758px; + height: 300px; + border-bottom: 1px solid #666; +} + +#cluster-process-group-summary-header { + height: 26px; + color: #666; + font-weight: normal; + margin-bottom: 1px; +} + +#cluster-process-group-refresh-button { + height: 24px; + width: 26px; + float: left; +} + +#cluster-process-group-summary-last-refreshed-container { + float: left; + margin-top: 6px; + margin-left: 3px; + -webkit-user-select: none; + -moz-user-select: none; +} + +#cluster-process-group-summary-last-refreshed { + font-weight: bold; +} + +#cluster-process-group-summary-loading-container { + float: left; + width: 16px; + height: 16px; + background-color: transparent; + margin-top: 4px; + margin-left: 3px; +} + +#cluster-process-group-details-container { + position: absolute; + right: 35px; +} + +#cluster-process-group-icon { + background-image: url(../images/iconProcessGroup.png); + width: 29px; + height: 20px; + float: left; + margin-right: 5px; + margin-top: 1px; +} + +#cluster-process-group-details { + float: left; +} + +#cluster-process-group-name { + margin-bottom: 2px; + color: #000; + font-weight: bold; + white-space: nowrap; + overflow: hidden; + width: 200px; } \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png new file mode 100644 index 0000000000..4ff5ac55ee Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png differ diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js index 2bd94d582c..fe4ed919a3 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js @@ -31,6 +31,7 @@ nf.SummaryTable = (function () { processGroups: '../nifi-api/controller/process-groups/', clusterProcessor: '../nifi-api/cluster/processors/', clusterConnection: '../nifi-api/cluster/connections/', + clusterProcessGroup: '../nifi-api/cluster/process-groups/', clusterInputPort: '../nifi-api/cluster/input-ports/', clusterOutputPort: '../nifi-api/cluster/output-ports/', clusterRemoteProcessGroup: '../nifi-api/cluster/remote-process-groups/', @@ -130,6 +131,9 @@ nf.SummaryTable = (function () { }, { name: 'Connections', tabContentId: 'connection-summary-tab-content' + }, { + name: 'Process Groups', + tabContentId: 'process-group-summary-tab-content' }], select: function () { var tab = $(this).text(); @@ -190,12 +194,12 @@ nf.SummaryTable = (function () { if (nf.Common.isDefinedAndNotNull(inputPortsGrid)) { inputPortsGrid.resizeCanvas(); - // update the total number of connections + // update the total number of input ports $('#displayed-items').text(nf.Common.formatInteger(inputPortsGrid.getData().getLength())); $('#total-items').text(nf.Common.formatInteger(inputPortsGrid.getData().getLength())); } - // update the combo for connections + // update the combo for input ports $('#summary-filter-type').combo({ options: [{ text: 'by name', @@ -211,12 +215,12 @@ nf.SummaryTable = (function () { if (nf.Common.isDefinedAndNotNull(outputPortsGrid)) { outputPortsGrid.resizeCanvas(); - // update the total number of connections + // update the total number of output ports $('#displayed-items').text(nf.Common.formatInteger(outputPortsGrid.getData().getLength())); $('#total-items').text(nf.Common.formatInteger(outputPortsGrid.getData().getLength())); } - // update the combo for connections + // update the combo for output ports $('#summary-filter-type').combo({ options: [{ text: 'by name', @@ -226,18 +230,18 @@ nf.SummaryTable = (function () { applyFilter(); } }); - } else { + } else if (tab === 'Remote Process Groups') { // ensure the connection table is size properly var remoteProcessGroupsGrid = $('#remote-process-group-summary-table').data('gridInstance'); if (nf.Common.isDefinedAndNotNull(remoteProcessGroupsGrid)) { remoteProcessGroupsGrid.resizeCanvas(); - // update the total number of connections + // update the total number of remote process groups $('#displayed-items').text(nf.Common.formatInteger(remoteProcessGroupsGrid.getData().getLength())); $('#total-items').text(nf.Common.formatInteger(remoteProcessGroupsGrid.getData().getLength())); } - // update the combo for connections + // update the combo for remote process groups $('#summary-filter-type').combo({ options: [{ text: 'by name', @@ -250,6 +254,27 @@ nf.SummaryTable = (function () { applyFilter(); } }); + } else { + // ensure the connection table is size properly + var processGroupGrid = $('#process-group-summary-table').data('gridInstance'); + if (nf.Common.isDefinedAndNotNull(processGroupGrid)) { + processGroupGrid.resizeCanvas(); + + // update the total number of process groups + $('#displayed-items').text(nf.Common.formatInteger(processGroupGrid.getData().getLength())); + $('#total-items').text(nf.Common.formatInteger(processGroupGrid.getData().getLength())); + } + + // update the combo for process groups + $('#summary-filter-type').combo({ + options: [{ + text: 'by name', + value: 'name' + }], + select: function (option) { + applyFilter(); + } + }); } // reset the filter @@ -458,18 +483,18 @@ nf.SummaryTable = (function () { // show the tooltip if (nf.Common.isDefinedAndNotNull(tooltip)) { - bulletinIcon.qtip($.extend({ + bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, { content: tooltip, position: { - target: 'mouse', - viewport: $(window), + container: $('#summary'), + at: 'bottom right', + my: 'top left', adjust: { - x: 8, - y: 8, - method: 'flipinvert flipinvert' + x: 4, + y: 4 } } - }, nf.Common.config.tooltipConfig)); + })); } } }); @@ -783,7 +808,7 @@ nf.SummaryTable = (function () { // hold onto an instance of the grid $('#cluster-connection-summary-table').data('gridInstance', clusterConnectionsGrid); - // define a custom formatter for showing more port details + // define a custom formatter for showing more port/group details var moreDetails = function (row, cell, value, columnDef, dataContext) { var markup = ''; @@ -794,10 +819,260 @@ nf.SummaryTable = (function () { return markup; }; + + var moreDetailsColumn = {id: 'moreDetails', field: 'moreDetails', name: ' ', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50}; + var transferredColumn = {id: 'transferred', field: 'transferred', name: 'Transferred / Size 5 min', toolTip: 'Count / data size transferred to and from connections in the last 5 min', resizable: true, defaultSortAsc: false, sortable: true}; + var sentColumn = {id: 'sent', field: 'sent', name: 'Sent / Size 5 min', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true}; + var receivedColumn = {id: 'received', field: 'received', name: 'Received / Size 5 min', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true}; + + // define the column model for the summary table + var processGroupsColumnModel = [ + moreDetailsColumn, + {id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true, formatter: valueFormatter}, + transferredColumn, + inputColumn, + ioColumn, + outputColumn, + sentColumn, + receivedColumn + ]; + + // add an action column if appropriate + if (isClustered || isInShell || nf.Common.SUPPORTS_SVG) { + // define how the column is formatted + var processGroupActionFormatter = function (row, cell, value, columnDef, dataContext) { + var markup = ''; + + if (isInShell && dataContext.groupId !== null) { + markup += ' '; + } + + if (nf.Common.SUPPORTS_SVG) { + if (isClustered) { + markup += ' '; + } else { + markup += ' '; + } + } + + if (isClustered) { + markup += ' '; + } + + return markup; + }; + + // define the action column for clusters and within the shell + processGroupsColumnModel.push({id: 'actions', name: ' ', formatter: processGroupActionFormatter, resizable: false, sortable: false, width: 75, maxWidth: 75}); + } + + // initialize the templates table + var processGroupsOptions = { + forceFitColumns: true, + enableTextSelectionOnCells: true, + enableCellNavigation: true, + enableColumnReorder: false, + autoEdit: false, + multiSelect: false + }; + + // initialize the dataview + var processGroupsData = new Slick.Data.DataView({ + inlineFilters: false + }); + processGroupsData.setItems([]); + processGroupsData.setFilterArgs({ + searchString: '', + property: 'name' + }); + processGroupsData.setFilter(filter); + + // initialize the sort + sort('process-group-summary-table', { + columnId: 'name', + sortAsc: true + }, processGroupsData); + + // initialize the grid + var processGroupsGrid = new Slick.Grid('#process-group-summary-table', processGroupsData, processGroupsColumnModel, processGroupsOptions); + processGroupsGrid.setSelectionModel(new Slick.RowSelectionModel()); + processGroupsGrid.registerPlugin(new Slick.AutoTooltips()); + processGroupsGrid.setSortColumn('name', true); + processGroupsGrid.onSort.subscribe(function (e, args) { + sort('process-group-summary-table', { + columnId: args.sortCol.field, + sortAsc: args.sortAsc + }, processGroupsData); + }); + + // configure a click listener + processGroupsGrid.onClick.subscribe(function (e, args) { + var target = $(e.target); + + // get the node at this row + var item = processGroupsData.getItem(args.row); + + // determine the desired action + if (processGroupsGrid.getColumns()[args.cell].id === 'actions') { + if (target.hasClass('go-to')) { + if (nf.Common.isDefinedAndNotNull(parent.nf) && nf.Common.isDefinedAndNotNull(parent.nf.CanvasUtils) && nf.Common.isDefinedAndNotNull(parent.nf.Shell)) { + parent.nf.CanvasUtils.enterGroup(item.id); + parent.$('#shell-close-button').click(); + } + } else if (target.hasClass('show-cluster-process-group-status-history')) { + nf.StatusHistory.showClusterProcessGroupChart(item.groupId, item.id); + } else if (target.hasClass('show-process-group-status-history')) { + nf.StatusHistory.showStandaloneProcessGroupChart(item.groupId, item.id); + } else if (target.hasClass('show-cluster-process-group-summary')) { + // load the cluster processor summary + loadClusterProcessGroupSummary(item.id); + + // hide the summary loading indicator + $('#summary-loading-container').hide(); + + // show the dialog + $('#cluster-process-group-summary-dialog').modal('show'); + } + } + }); + + // wire up the dataview to the grid + processGroupsData.onRowCountChanged.subscribe(function (e, args) { + processGroupsGrid.updateRowCount(); + processGroupsGrid.render(); + + // update the total number of displayed process groups if necessary + if ($('#process-group-summary-table').is(':visible')) { + $('#displayed-items').text(nf.Common.formatInteger(args.current)); + } + }); + processGroupsData.onRowsChanged.subscribe(function (e, args) { + processGroupsGrid.invalidateRows(args.rows); + processGroupsGrid.render(); + }); + + // hold onto an instance of the grid + $('#process-group-summary-table').data('gridInstance', processGroupsGrid).on('mouseenter', 'div.slick-cell', function (e) { + var bulletinIcon = $(this).find('img.has-bulletins'); + if (bulletinIcon.length && !bulletinIcon.data('qtip')) { + var processGroupId = $(this).find('span.row-id').text(); + + // get the status item + var item = processGroupsData.getItemById(processGroupId); + + // format the tooltip + var bulletins = nf.Common.getFormattedBulletins(item.bulletins); + var tooltip = nf.Common.formatUnorderedList(bulletins); + + // show the tooltip + if (nf.Common.isDefinedAndNotNull(tooltip)) { + bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, { + content: tooltip, + position: { + container: $('#summary'), + at: 'bottom right', + my: 'top left', + adjust: { + x: 4, + y: 4 + } + } + })); + } + } + }); + + // initialize the cluster process group summary dialog + $('#cluster-process-group-summary-dialog').modal({ + headerText: 'Cluster Process Group Summary', + overlayBackground: false, + buttons: [{ + buttonText: 'Close', + handler: { + click: function () { + // clear the cluster processor summary dialog + $('#cluster-process-group-id').text(''); + $('#cluster-process-group-name').text(''); + + // close the dialog + this.modal('hide'); + } + } + }], + handler: { + close: function () { + // show the summary loading container + $('#summary-loading-container').show(); + } + } + }); + + // cluster process group refresh + nf.Common.addHoverEffect('#cluster-process-group-refresh-button', 'button-refresh', 'button-refresh-hover').click(function () { + loadClusterProcessGroupSummary($('#cluster-process-group-id').text()); + }); + + // initialize the cluster process groups column model + var clusterProcessGroupsColumnModel = [ + {id: 'node', field: 'node', name: 'Node', sortable: true, resizable: true}, + transferredColumn, + inputColumn, + ioColumn, + outputColumn, + sentColumn, + receivedColumn + ]; + + // initialize the options for the cluster processors table + var clusterProcessGroupsOptions = { + forceFitColumns: true, + enableTextSelectionOnCells: true, + enableCellNavigation: true, + enableColumnReorder: false, + autoEdit: false, + multiSelect: false + }; + + // initialize the dataview + var clusterProcessGroupsData = new Slick.Data.DataView({ + inlineFilters: false + }); + clusterProcessGroupsData.setItems([]); + + // initialize the sort + sort('cluster-processor-summary-table', { + columnId: 'node', + sortAsc: true + }, clusterProcessGroupsData); + + // initialize the grid + var clusterProcessGroupsGrid = new Slick.Grid('#cluster-process-group-summary-table', clusterProcessGroupsData, clusterProcessGroupsColumnModel, clusterProcessGroupsOptions); + clusterProcessGroupsGrid.setSelectionModel(new Slick.RowSelectionModel()); + clusterProcessGroupsGrid.registerPlugin(new Slick.AutoTooltips()); + clusterProcessGroupsGrid.setSortColumn('node', true); + clusterProcessGroupsGrid.onSort.subscribe(function (e, args) { + sort('cluster-process-group-summary-table', { + columnId: args.sortCol.field, + sortAsc: args.sortAsc + }, clusterProcessGroupsData); + }); + + // wire up the dataview to the grid + clusterProcessGroupsData.onRowCountChanged.subscribe(function (e, args) { + clusterProcessGroupsGrid.updateRowCount(); + clusterProcessGroupsGrid.render(); + }); + clusterProcessGroupsData.onRowsChanged.subscribe(function (e, args) { + clusterProcessGroupsGrid.invalidateRows(args.rows); + clusterProcessGroupsGrid.render(); + }); + + // hold onto an instance of the grid + $('#cluster-process-group-summary-table').data('gridInstance', clusterProcessGroupsGrid); // define the column model for the summary table var inputPortsColumnModel = [ - {id: 'moreDetails', field: 'moreDetails', name: ' ', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50}, + moreDetailsColumn, nameColumn, runStatusColumn, outputColumn @@ -917,18 +1192,18 @@ nf.SummaryTable = (function () { // show the tooltip if (nf.Common.isDefinedAndNotNull(tooltip)) { - bulletinIcon.qtip($.extend({ + bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, { content: tooltip, position: { - target: 'mouse', - viewport: $(window), + container: $('#summary'), + at: 'bottom right', + my: 'top left', adjust: { - x: 8, - y: 8, - method: 'flipinvert flipinvert' + x: 4, + y: 4 } } - }, nf.Common.config.tooltipConfig)); + })); } } }); @@ -1019,7 +1294,7 @@ nf.SummaryTable = (function () { // define the column model for the summary table var outputPortsColumnModel = [ - {id: 'moreDetails', field: 'moreDetails', name: ' ', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50}, + moreDetailsColumn, nameColumn, runStatusColumn, inputColumn @@ -1139,18 +1414,18 @@ nf.SummaryTable = (function () { // show the tooltip if (nf.Common.isDefinedAndNotNull(tooltip)) { - bulletinIcon.qtip($.extend({ + bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, { content: tooltip, position: { - target: 'mouse', - viewport: $(window), + container: $('#summary'), + at: 'bottom right', + my: 'top left', adjust: { - x: 8, - y: 8, - method: 'flipinvert flipinvert' + x: 4, + y: 4 } } - }, nf.Common.config.tooltipConfig)); + })); } } }); @@ -1266,8 +1541,6 @@ nf.SummaryTable = (function () { var transmissionStatusColumn = {id: 'transmissionStatus', field: 'transmissionStatus', name: 'Transmitting', formatter: transmissionStatusFormatter, sortable: true, resizable: true}; var targetUriColumn = {id: 'targetUri', field: 'targetUri', name: 'Target URI', sortable: true, resizable: true}; - var sentColumn = {id: 'sent', field: 'sent', name: 'Sent / Size 5 min', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true}; - var receivedColumn = {id: 'received', field: 'received', name: 'Received / Size 5 min', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true}; // define the column model for the summary table var remoteProcessGroupsColumnModel = [ @@ -1405,18 +1678,18 @@ nf.SummaryTable = (function () { // show the tooltip if (nf.Common.isDefinedAndNotNull(tooltip)) { - bulletinIcon.qtip($.extend({ + bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, { content: tooltip, position: { - target: 'mouse', - viewport: $(window), + container: $('#summary'), + at: 'bottom right', + my: 'top left', adjust: { - x: 8, - y: 8, - method: 'flipinvert flipinvert' + x: 4, + y: 4 } } - }, nf.Common.config.tooltipConfig)); + })); } } }); @@ -1607,7 +1880,7 @@ nf.SummaryTable = (function () { var bQueueSize = nf.Common.parseSize(b['queuedSize']); return aQueueSize - bQueueSize; } - } else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output') { + } else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output' || sortDetails.columnId === 'transferred') { var aSplit = a[sortDetails.columnId].split(/ \/ /); var bSplit = b[sortDetails.columnId].split(/ \/ /); var mod = sortState[tableId].count % 4; @@ -1670,6 +1943,8 @@ nf.SummaryTable = (function () { $('#' + tableId + ' span.sent-size-title').removeClass('sorted'); $('#' + tableId + ' span.received-title').removeClass('sorted'); $('#' + tableId + ' span.received-size-title').removeClass('sorted'); + $('#' + tableId + ' span.transferred-title').removeClass('sorted'); + $('#' + tableId + ' span.transferred-size-title').removeClass('sorted'); // update/reset the count as appropriate if (sortState[tableId].prevColumn !== sortDetails.columnId) { @@ -1809,12 +2084,13 @@ nf.SummaryTable = (function () { * * @argument {array} processorItems The processor data * @argument {array} connectionItems The connection data + * @argument {array} processGroupItems The process group data * @argument {array} inputPortItems The input port data * @argument {array} outputPortItems The input port data * @argument {array} remoteProcessGroupItems The remote process group data * @argument {object} processGroupStatus The process group status */ - var populateProcessGroupStatus = function (processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus) { + var populateProcessGroupStatus = function (processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus) { // add the processors to the summary grid $.each(processGroupStatus.processorStatus, function (i, procStatus) { processorItems.push(procStatus); @@ -1839,10 +2115,13 @@ nf.SummaryTable = (function () { $.each(processGroupStatus.remoteProcessGroupStatus, function (i, rpgStatus) { remoteProcessGroupItems.push(rpgStatus); }); + + // add the process group status as well + processGroupItems.push(processGroupStatus); // add any child group's status $.each(processGroupStatus.processGroupStatus, function (i, childProcessGroup) { - populateProcessGroupStatus(processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, childProcessGroup); + populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, childProcessGroup); }); }; @@ -1975,6 +2254,63 @@ nf.SummaryTable = (function () { }).fail(nf.Common.handleAjaxError); }; + /** + * Loads the cluster input port details dialog for the specified processor. + * + * @argument {string} rowId The row id + */ + var loadClusterProcessGroupSummary = function (rowId) { + // get the summary + $.ajax({ + type: 'GET', + url: config.urls.clusterProcessGroup + encodeURIComponent(rowId) + '/status', + data: { + verbose: true + }, + dataType: 'json' + }).done(function (response) { + if (nf.Common.isDefinedAndNotNull(response.clusterProcessGroupStatus)) { + var clusterProcessGroupStatus = response.clusterProcessGroupStatus; + + var clusterProcessGroupsGrid = $('#cluster-process-group-summary-table').data('gridInstance'); + var clusterProcessGroupsData = clusterProcessGroupsGrid.getData(); + + var clusterProcessGroups = []; + + // populate the table + $.each(clusterProcessGroupStatus.nodeProcessGroupStatus, function (i, nodeProcessGroupStatus) { + clusterProcessGroups.push({ + id: nodeProcessGroupStatus.node.nodeId, + node: nodeProcessGroupStatus.node.address + ':' + nodeProcessGroupStatus.node.apiPort, + activeThreadCount: nodeProcessGroupStatus.processGroupStatus.activeThreadCount, + transferred: nodeProcessGroupStatus.processGroupStatus.transferred, + input: nodeProcessGroupStatus.processGroupStatus.input, + queued: nodeProcessGroupStatus.processGroupStatus.queued, + queuedCount: nodeProcessGroupStatus.processGroupStatus.queuedCount, + queuedSize: nodeProcessGroupStatus.processGroupStatus.queuedSize, + output: nodeProcessGroupStatus.processGroupStatus.output, + read: nodeProcessGroupStatus.processGroupStatus.read, + written: nodeProcessGroupStatus.processGroupStatus.written, + sent: nodeProcessGroupStatus.processGroupStatus.sent, + received: nodeProcessGroupStatus.processGroupStatus.received + }); + }); + + // update the input ports + clusterProcessGroupsData.setItems(clusterProcessGroups); + clusterProcessGroupsData.reSort(); + clusterProcessGroupsGrid.invalidate(); + + // populate the input port details + $('#cluster-process-group-name').text(clusterProcessGroupStatus.processGroupName).ellipsis(); + $('#cluster-process-group-id').text(clusterProcessGroupStatus.processGroupId); + + // update the stats last refreshed timestamp + $('#cluster-process-group-summary-last-refreshed').text(clusterProcessGroupStatus.statsLastRefreshed); + } + }).fail(nf.Common.handleAjaxError); + }; + /** * Loads the cluster input port details dialog for the specified processor. * @@ -2126,14 +2462,17 @@ nf.SummaryTable = (function () { }; return { + /** * URL for loading system diagnostics. */ systemDiagnosticsUrl: null, + /** * URL for loading the summary. */ url: null, + /** * Initializes the status table. * @@ -2157,6 +2496,7 @@ nf.SummaryTable = (function () { }); }).promise(); }, + /** * Update the size of the grid based on its container's current size. */ @@ -2170,6 +2510,11 @@ nf.SummaryTable = (function () { if (nf.Common.isDefinedAndNotNull(connectionsGrid)) { connectionsGrid.resizeCanvas(); } + + var processGroupsGrid = $('#process-group-summary-table').data('gridInstance'); + if (nf.Common.isDefinedAndNotNull(processGroupsGrid)) { + processGroupsGrid.resizeCanvas(); + } var inputPortGrid = $('#input-port-summary-table').data('gridInstance'); if (nf.Common.isDefinedAndNotNull(connectionsGrid)) { @@ -2186,6 +2531,7 @@ nf.SummaryTable = (function () { remoteProcessGroupGrid.resizeCanvas(); } }, + /** * Load the processor status table. */ @@ -2212,6 +2558,14 @@ nf.SummaryTable = (function () { // get the connections grid/data (do not render bulletins) var connectionsGrid = $('#connection-summary-table').data('gridInstance'); var connectionsData = connectionsGrid.getData(); + + // remove any tooltips from the process group table + var processGroupGridElement = $('#process-group-summary-table'); + nf.Common.cleanUpTooltips(processGroupGridElement, 'img.has-bulletins'); + + // get the process group grid/data + var processGroupGrid = processGroupGridElement.data('gridInstance'); + var processGroupData = processGroupGrid.getData(); // remove any tooltips from the input port table var inputPortsGridElement = $('#input-port-summary-table'); @@ -2239,12 +2593,13 @@ nf.SummaryTable = (function () { var processorItems = []; var connectionItems = []; + var processGroupItems = []; var inputPortItems = []; var outputPortItems = []; var remoteProcessGroupItems = []; // populate the tables - populateProcessGroupStatus(processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus); + populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus); // update the processors processorsData.setItems(processorItems); @@ -2255,6 +2610,11 @@ nf.SummaryTable = (function () { connectionsData.setItems(connectionItems); connectionsData.reSort(); connectionsGrid.invalidate(); + + // update the process groups + processGroupData.setItems(processGroupItems); + processGroupData.reSort(); + processGroupGrid.invalidate(); // update the input ports inputPortsData.setItems(inputPortItems);