NIFI-524:

- Adding a Process Group tab to the summary table to show aggregate statistics of their comonents.
This commit is contained in:
Matt Gilman 2015-04-23 12:59:41 -04:00
parent 8bfc969142
commit ff831dc33d
16 changed files with 1002 additions and 123 deletions

View File

@ -43,6 +43,8 @@ public class ProcessGroupStatus implements Cloneable {
private long bytesReceived;
private int flowFilesSent;
private long bytesSent;
private int flowFilesTransferred;
private long bytesTransferred;
private Collection<ConnectionStatus> connectionStatus = new ArrayList<>();
private Collection<ProcessorStatus> processorStatus = new ArrayList<>();
@ -227,6 +229,22 @@ public class ProcessGroupStatus implements Cloneable {
this.bytesSent = bytesSent;
}
public int getFlowFilesTransferred() {
return flowFilesTransferred;
}
public void setFlowFilesTransferred(int flowFilesTransferred) {
this.flowFilesTransferred = flowFilesTransferred;
}
public long getBytesTransferred() {
return bytesTransferred;
}
public void setBytesTransferred(long bytesTransferred) {
this.bytesTransferred = bytesTransferred;
}
@Override
public ProcessGroupStatus clone() {
@ -248,6 +266,8 @@ public class ProcessGroupStatus implements Cloneable {
clonedObj.bytesReceived = bytesReceived;
clonedObj.flowFilesSent = flowFilesSent;
clonedObj.bytesSent = bytesSent;
clonedObj.flowFilesTransferred = flowFilesTransferred;
clonedObj.bytesTransferred = bytesTransferred;
if (connectionStatus != null) {
final Collection<ConnectionStatus> statusList = new ArrayList<>();
@ -317,6 +337,18 @@ public class ProcessGroupStatus implements Cloneable {
builder.append(creationTimestamp);
builder.append(", activeThreadCount=");
builder.append(activeThreadCount);
builder.append(", flowFilesTransferred=");
builder.append(flowFilesTransferred);
builder.append(", bytesTransferred=");
builder.append(bytesTransferred);
builder.append(", flowFilesReceived=");
builder.append(flowFilesReceived);
builder.append(", bytesReceived=");
builder.append(bytesReceived);
builder.append(", flowFilesSent=");
builder.append(flowFilesSent);
builder.append(", bytesSent=");
builder.append(bytesSent);
builder.append(",\n\tconnectionStatus=");
for (final ConnectionStatus status : connectionStatus) {
@ -374,6 +406,12 @@ public class ProcessGroupStatus implements Cloneable {
target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
// connection status
// sort by id

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.status;
import java.util.Collection;
import java.util.Date;
import javax.xml.bind.annotation.XmlType;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.web.api.dto.util.TimeAdapter;
/**
* DTO for serializing the a process group's status across the cluster.
*/
@XmlType(name = "clusterProcessGroupStatus")
public class ClusterProcessGroupStatusDTO {
private Collection<NodeProcessGroupStatusDTO> nodeProcessGroupStatus;
private Date statsLastRefreshed;
private String processGroupId;
private String processGroupName;
/**
* The time the status were last refreshed.
*
* @return The time the status were last refreshed
*/
@XmlJavaTypeAdapter(TimeAdapter.class)
public Date getStatsLastRefreshed() {
return statsLastRefreshed;
}
public void setStatsLastRefreshed(Date statsLastRefreshed) {
this.statsLastRefreshed = statsLastRefreshed;
}
/**
* The process group id.
*
* @return The process group id
*/
public String getProcessGroupId() {
return processGroupId;
}
public void setProcessGroupId(String processGroupId) {
this.processGroupId = processGroupId;
}
/**
* The process group name.
*
* @return The process group name
*/
public String getProcessGroupName() {
return processGroupName;
}
public void setProcessGroupName(String processGroupName) {
this.processGroupName = processGroupName;
}
/**
* Collection of node process group status DTO.
*
* @return The collection of node process group status DTO
*/
public Collection<NodeProcessGroupStatusDTO> getNodeProcessGroupStatus() {
return nodeProcessGroupStatus;
}
public void setNodeProcessGroupStatus(Collection<NodeProcessGroupStatusDTO> nodeProcessGroupStatus) {
this.nodeProcessGroupStatus = nodeProcessGroupStatus;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.status;
import javax.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.dto.NodeDTO;
/**
* DTO for serializing the process group status for a particular node.
*/
@XmlType(name = "nodeProcessGroupStatus")
public class NodeProcessGroupStatusDTO {
private NodeDTO node;
private ProcessGroupStatusDTO processGroupStatus;
/**
* The node.
*
* @return The node DTO
*/
public NodeDTO getNode() {
return node;
}
public void setNode(NodeDTO node) {
this.node = node;
}
/**
* The process group's status.
*
* @return The process group status
*/
public ProcessGroupStatusDTO getProcessGroupStatus() {
return processGroupStatus;
}
public void setProcessGroupStatus(ProcessGroupStatusDTO processGroupStatus) {
this.processGroupStatus = processGroupStatus;
}
}

View File

@ -38,10 +38,15 @@ public class ProcessGroupStatusDTO extends StatusDTO {
private Collection<PortStatusDTO> outputPortStatus;
private String input;
private String queuedCount;
private String queuedSize;
private String queued;
private String read;
private String written;
private String output;
private String transferred;
private String received;
private String sent;
private Integer activeThreadCount;
private Date statsLastRefreshed;
@ -171,6 +176,74 @@ public class ProcessGroupStatusDTO extends StatusDTO {
this.output = output;
}
/**
* The transferred stats for this process group. This represents the
* count/size of flowfiles transferred to/from queues.
*
* @return The transferred status for this process group
*/
public String getTransferred() {
return transferred;
}
public void setTransferred(String transferred) {
this.transferred = transferred;
}
/**
* The received stats for this process group. This represents the count/size
* of flowfiles received.
*
* @return The received stats for this process group
*/
public String getReceived() {
return received;
}
public void setReceived(String received) {
this.received = received;
}
/**
* The sent stats for this process group. This represents the count/size of
* flowfiles sent.
*
* @return The sent stats for this process group
*/
public String getSent() {
return sent;
}
public void setSent(String sent) {
this.sent = sent;
}
/**
* The queued count for this process group.
*
* @return The queued count for this process group
*/
public String getQueuedCount() {
return queuedCount;
}
public void setQueuedCount(String queuedCount) {
this.queuedCount = queuedCount;
}
/**
* The queued size for this process group.
*
* @return The queued size for this process group
*/
public String getQueuedSize() {
return queuedSize;
}
public void setQueuedSize(String queuedSize) {
this.queuedSize = queuedSize;
}
/**
* The queued stats for this process group.
*

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
/**
* A serialized representation of this class can be placed in the entity body of
* a request or response to or from the API. This particular entity holds a
* reference to a ClusterProcessGroupStatusDTO.
*/
@XmlRootElement(name = "clusterProcessGroupStatusEntity")
public class ClusterProcessGroupStatusEntity extends Entity {
private ClusterProcessGroupStatusDTO clusterProcessGroupStatus;
/**
* The ClusterProcessGroupStatusDTO that is being serialized.
*
* @return The ClusterProcessGroupStatusDTO object
*/
public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus() {
return clusterProcessGroupStatus;
}
public void setClusterProcessGroupStatus(ClusterProcessGroupStatusDTO clusterProcessGroupStatus) {
this.clusterProcessGroupStatus = clusterProcessGroupStatus;
}
}

View File

@ -2063,6 +2063,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
long bytesReceived = 0L;
int flowFilesSent = 0;
long bytesSent = 0L;
int flowFilesTransferred = 0;
long bytesTransferred = 0;
// set status for processors
final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
@ -2096,6 +2098,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
bytesReceived += childGroupStatus.getBytesReceived();
flowFilesSent += childGroupStatus.getFlowFilesSent();
bytesSent += childGroupStatus.getBytesSent();
flowFilesTransferred += childGroupStatus.getFlowFilesTransferred();
bytesTransferred += childGroupStatus.getBytesTransferred();
}
// set status for remote child groups
@ -2133,6 +2138,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
connStatus.setInputCount(connectionStatusReport.getFlowFilesIn());
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
}
if (StringUtils.isNotBlank(conn.getName())) {
@ -2303,6 +2311,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setBytesReceived(bytesReceived);
status.setFlowFilesSent(flowFilesSent);
status.setBytesSent(bytesSent);
status.setFlowFilesTransferred(flowFilesTransferred);
status.setBytesTransferred(bytesTransferred);
return status;
}

View File

@ -63,6 +63,7 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
@ -774,8 +775,7 @@ public interface NiFiServiceFacade {
void verifyUpdateRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO);
/**
* Verifies the specified remote process group can update the specified
* remote input port.
* Verifies the specified remote process group can update the specified remote input port.
*
* @param groupId The id of the parent group
* @param remoteProcessGroupId The id of the remote process group
@ -784,8 +784,7 @@ public interface NiFiServiceFacade {
void verifyUpdateRemoteProcessGroupInputPort(String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO);
/**
* Verifies the specified remote process group can update the specified
* remote output port.
* Verifies the specified remote process group can update the specified remote output port.
*
* @param groupId The id of the parent group
* @param remoteProcessGroupId The id of the remote process group
@ -977,13 +976,12 @@ public interface NiFiServiceFacade {
* Gets the specified controller service.
*
* @param controllerServiceId id
* @return service
* @return service
*/
ControllerServiceDTO getControllerService(String controllerServiceId);
/**
* Get the descriptor for the specified property of the specified controller
* service.
* Get the descriptor for the specified property of the specified controller service.
*
* @param id id
* @param property property
@ -1037,8 +1035,7 @@ public interface NiFiServiceFacade {
void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO);
/**
* Verifies the referencing components of the specified controller service
* can be updated.
* Verifies the referencing components of the specified controller service can be updated.
*
* @param controllerServiceId id
* @param scheduledState schedule state
@ -1081,8 +1078,7 @@ public interface NiFiServiceFacade {
ReportingTaskDTO getReportingTask(String reportingTaskId);
/**
* Get the descriptor for the specified property of the specified reporting
* task.
* Get the descriptor for the specified property of the specified reporting task.
*
* @param id id
* @param property property
@ -1196,8 +1192,7 @@ public interface NiFiServiceFacade {
void verifyUpdateSnippet(SnippetDTO snippetDto);
/**
* If group id is specified, moves the specified snippet to the specified
* group.
* If group id is specified, moves the specified snippet to the specified group.
*
* @param revision revision
* @param snippetDto snippet
@ -1256,8 +1251,7 @@ public interface NiFiServiceFacade {
void invalidateUser(String userId);
/**
* Invalidates the specified user accounts and all accounts associated with
* this group.
* Invalidates the specified user accounts and all accounts associated with this group.
*
* @param userGroup group
* @param userIds id
@ -1272,8 +1266,7 @@ public interface NiFiServiceFacade {
void deleteUser(String userId);
/**
* Updates a user group with the specified group and comprised of the
* specified users.
* Updates a user group with the specified group and comprised of the specified users.
*
* @param userGroup group
* @return group
@ -1298,8 +1291,7 @@ public interface NiFiServiceFacade {
// Cluster methods
// ----------------------------------------
/**
* @return true if controller is connected or trying to connect to the
* cluster
* @return true if controller is connected or trying to connect to the cluster
*/
boolean isClustered();
@ -1369,8 +1361,7 @@ public interface NiFiServiceFacade {
/**
* @param processorId id
* @return the processor status history for each node connected to the
* cluster
* @return the processor status history for each node connected to the cluster
*/
ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId);
@ -1384,28 +1375,34 @@ public interface NiFiServiceFacade {
/**
* @param connectionId id
* @return the connection status history for each node connected to the
* cluster
* @return the connection status history for each node connected to the cluster
*/
ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId);
/**
* @param processGroupId id
* @return the process group status history for each node connected to the
* cluster
* @return the process group status history for each node connected to the cluster
*/
ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId);
/**
* @param remoteProcessGroupId id
* @return the remote process group status history for each node connected
* to the cluster
* Returns a process group's status for each node connected to the cluster.
*
* @param processorId a process group identifier
* @return The cluster process group status transfer object.
*/
ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId);
/**
* Returns the remote process group status history for each node connected to the cluster.
*
* @param remoteProcessGroupId a remote process group identifier
* @return The cluster status history
*/
ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId);
/**
* Returns a remote process group's status for each node connected to the
* cluster.
* Returns a remote process group's status for each node connected to the cluster.
*
* @param remoteProcessGroupId a remote process group identifier
* @return The cluster remote process group status transfer object.

View File

@ -163,6 +163,8 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.slf4j.Logger;
@ -2447,6 +2449,77 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return clusterConnectionStatusDto;
}
private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) {
ProcessGroupStatus processGroupStatus = null;
if (processGroupId.equals(groupStatus.getId())) {
processGroupStatus = groupStatus;
}
if (processGroupStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
processGroupStatus = findNodeProcessGroupStatus(status, processGroupId);
if (processGroupStatus != null) {
break;
}
}
}
return processGroupStatus;
}
@Override
public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) {
final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO();
clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList<NodeProcessGroupStatusDTO>());
// set the current time
clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// attempt to find the process group stats for this node
final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId);
// sanity check that we have status for this process group
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId));
}
if (firstNode) {
clusterProcessGroupStatusDto.setProcessGroupId(processGroupId);
clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName());
firstNode = false;
}
// create node process group status dto
final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO();
clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO);
// populate node process group status dto
final String nodeId = node.getNodeId().getId();
nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus));
}
return clusterProcessGroupStatusDto;
}
private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) {
PortStatus portStatus = null;

View File

@ -69,6 +69,8 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.security.access.prepost.PreAuthorize;
import com.sun.jersey.api.core.ResourceContext;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
import org.codehaus.enunciate.jaxrs.TypeHint;
/**
@ -95,9 +97,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the status of this NiFi cluster.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @return A clusterStatusEntity
*/
@GET
@ -144,12 +144,9 @@ public class ClusterResource extends ApplicationResource {
}
/**
* Gets the contents of this NiFi cluster. This includes all nodes and their
* status.
* Gets the contents of this NiFi cluster. This includes all nodes and their status.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @return A clusterEntity
*/
@GET
@ -231,9 +228,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the processor.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the processor
* @return A processorEntity
*/
@ -267,11 +262,8 @@ public class ClusterResource extends ApplicationResource {
* Updates the processors annotation data.
*
* @param httpServletRequest
* @param version The revision is used to verify the client is working with
* the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param version The revision is used to verify the client is working with the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param processorId The id of the processor.
* @param annotationData The annotation data to set.
* @return A processorEntity.
@ -395,9 +387,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the processor status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusEntity
*/
@ -431,9 +421,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the processor status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusHistoryEntity
*/
@ -466,9 +454,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the connection status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusEntity
*/
@ -502,9 +488,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the connections status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusHistoryEntity
*/
@ -534,12 +518,44 @@ public class ClusterResource extends ApplicationResource {
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the process group status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the process group
* @return A clusterProcessGroupStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/process-groups/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterConnectionStatusEntity.class)
public Response getProcessGroupStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity();
entity.setClusterProcessGroupStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the process group status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the process group
* @return A clusterProcessGroupStatusHistoryEntity
*/
@ -572,9 +588,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the remote process group status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the remote process group
* @return A clusterRemoteProcessGroupStatusEntity
*/
@ -608,9 +622,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the input port status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the input port
* @return A clusterPortStatusEntity
*/
@ -644,9 +656,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the output port status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the output port
* @return A clusterPortStatusEntity
*/
@ -680,9 +690,7 @@ public class ClusterResource extends ApplicationResource {
/**
* Gets the remote process group status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the processor
* @return A clusterRemoteProcessGroupStatusHistoryEntity
*/

View File

@ -506,13 +506,21 @@ public final class DtoFactory {
processGroupStatusDto.setId(processGroupStatus.getId());
processGroupStatusDto.setName(processGroupStatus.getName());
processGroupStatusDto.setStatsLastRefreshed(new Date(processGroupStatus.getCreationTimestamp()));
processGroupStatusDto.setQueued(formatCount(processGroupStatus.getQueuedCount()) + " / " + formatDataSize(processGroupStatus.getQueuedContentSize()));
processGroupStatusDto.setRead(formatDataSize(processGroupStatus.getBytesRead()));
processGroupStatusDto.setWritten(formatDataSize(processGroupStatus.getBytesWritten()));
processGroupStatusDto.setInput(formatCount(processGroupStatus.getInputCount()) + " / " + formatDataSize(processGroupStatus.getInputContentSize()));
processGroupStatusDto.setOutput(formatCount(processGroupStatus.getOutputCount()) + " / " + formatDataSize(processGroupStatus.getOutputContentSize()));
processGroupStatusDto.setTransferred(formatCount(processGroupStatus.getFlowFilesTransferred()) + " / " + formatDataSize(processGroupStatus.getBytesTransferred()));
processGroupStatusDto.setSent(formatCount(processGroupStatus.getFlowFilesSent()) + " / " + formatDataSize(processGroupStatus.getBytesSent()));
processGroupStatusDto.setReceived(formatCount(processGroupStatus.getFlowFilesReceived()) + " / " + formatDataSize(processGroupStatus.getBytesReceived()));
processGroupStatusDto.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
final String queuedCount = FormatUtils.formatCount(processGroupStatus.getQueuedCount());
final String queuedSize = FormatUtils.formatDataSize(processGroupStatus.getQueuedContentSize());
processGroupStatusDto.setQueuedCount(queuedCount);
processGroupStatusDto.setQueuedSize(queuedSize);
processGroupStatusDto.setQueued(queuedCount + " / " + queuedSize);
final Map<String, StatusDTO> componentStatusDtoMap = new HashMap<>();
// processor status
@ -1504,8 +1512,7 @@ public final class DtoFactory {
}
/**
* Creates a ProvenanceEventNodeDTO for the specified
* ProvenanceEventLineageNode.
* Creates a ProvenanceEventNodeDTO for the specified ProvenanceEventLineageNode.
*
* @param node
* @return
@ -2158,9 +2165,8 @@ public final class DtoFactory {
/**
*
* @param original
* @param deep if <code>true</code>, all Connections, ProcessGroups, Ports,
* Processors, etc. will be copied. If <code>false</code>, the copy will
* have links to the same objects referenced by <code>original</code>.
* @param deep if <code>true</code>, all Connections, ProcessGroups, Ports, Processors, etc. will be copied. If <code>false</code>, the copy will have links to the same objects referenced by
* <code>original</code>.
*
* @return
*/

View File

@ -74,6 +74,7 @@
<jsp:include page="/WEB-INF/partials/summary/cluster-output-port-summary-dialog.jsp"/>
<jsp:include page="/WEB-INF/partials/summary/cluster-remote-process-group-summary-dialog.jsp"/>
<jsp:include page="/WEB-INF/partials/summary/cluster-connection-summary-dialog.jsp"/>
<jsp:include page="/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp"/>
<jsp:include page="/WEB-INF/partials/summary/system-diagnostics-dialog.jsp"/>
<jsp:include page="/WEB-INF/partials/summary/view-single-node-dialog.jsp"/>
<div id="faded-background"></div>

View File

@ -0,0 +1,36 @@
<%--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--%>
<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %>
<div id="cluster-process-group-summary-dialog">
<div class="dialog-content">
<div id="cluster-process-group-summary-header">
<div id="cluster-process-group-refresh-button" class="summary-refresh pointer" title="Refresh"></div>
<div id="cluster-process-group-summary-last-refreshed-container">
Last updated:&nbsp;<span id="cluster-process-group-summary-last-refreshed"></span>
</div>
<div id="cluster-process-group-summary-loading-container" class="loading-container"></div>
<div id="cluster-process-group-details-container">
<div id="cluster-process-group-icon"></div>
<div id="cluster-process-group-details">
<div id="cluster-process-group-name"></div>
<div id="cluster-process-group-id"></div>
</div>
</div>
</div>
<div id="cluster-process-group-summary-table"></div>
</div>
</div>

View File

@ -51,6 +51,9 @@
<div id="connection-summary-tab-content" class="configuration-tab">
<div id="connection-summary-table" class="summary-table"></div>
</div>
<div id="process-group-summary-tab-content" class="configuration-tab">
<div id="process-group-summary-table" class="summary-table"></div>
</div>
<div id="input-port-summary-tab-content" class="configuration-tab">
<div id="input-port-summary-table" class="summary-table"></div>
</div>

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Status Styles
*/
@ -386,6 +387,12 @@ span.sorted {
text-decoration: underline;
}
/* tooltips in the summary table */
#summary .nifi-tooltip {
max-width: 500px;
}
/* cluster processor summary table */
#cluster-processor-summary-dialog {
@ -765,4 +772,80 @@ span.sorted {
white-space: nowrap;
overflow: hidden;
width: 200px;
}
/* cluster process group summary table */
#cluster-process-group-summary-dialog {
display: none;
width: 778px;
height: 450px;
z-index: 1301;
}
#cluster-process-group-summary-table {
width: 758px;
height: 300px;
border-bottom: 1px solid #666;
}
#cluster-process-group-summary-header {
height: 26px;
color: #666;
font-weight: normal;
margin-bottom: 1px;
}
#cluster-process-group-refresh-button {
height: 24px;
width: 26px;
float: left;
}
#cluster-process-group-summary-last-refreshed-container {
float: left;
margin-top: 6px;
margin-left: 3px;
-webkit-user-select: none;
-moz-user-select: none;
}
#cluster-process-group-summary-last-refreshed {
font-weight: bold;
}
#cluster-process-group-summary-loading-container {
float: left;
width: 16px;
height: 16px;
background-color: transparent;
margin-top: 4px;
margin-left: 3px;
}
#cluster-process-group-details-container {
position: absolute;
right: 35px;
}
#cluster-process-group-icon {
background-image: url(../images/iconProcessGroup.png);
width: 29px;
height: 20px;
float: left;
margin-right: 5px;
margin-top: 1px;
}
#cluster-process-group-details {
float: left;
}
#cluster-process-group-name {
margin-bottom: 2px;
color: #000;
font-weight: bold;
white-space: nowrap;
overflow: hidden;
width: 200px;
}

View File

@ -31,6 +31,7 @@ nf.SummaryTable = (function () {
processGroups: '../nifi-api/controller/process-groups/',
clusterProcessor: '../nifi-api/cluster/processors/',
clusterConnection: '../nifi-api/cluster/connections/',
clusterProcessGroup: '../nifi-api/cluster/process-groups/',
clusterInputPort: '../nifi-api/cluster/input-ports/',
clusterOutputPort: '../nifi-api/cluster/output-ports/',
clusterRemoteProcessGroup: '../nifi-api/cluster/remote-process-groups/',
@ -130,6 +131,9 @@ nf.SummaryTable = (function () {
}, {
name: 'Connections',
tabContentId: 'connection-summary-tab-content'
}, {
name: 'Process Groups',
tabContentId: 'process-group-summary-tab-content'
}],
select: function () {
var tab = $(this).text();
@ -190,12 +194,12 @@ nf.SummaryTable = (function () {
if (nf.Common.isDefinedAndNotNull(inputPortsGrid)) {
inputPortsGrid.resizeCanvas();
// update the total number of connections
// update the total number of input ports
$('#displayed-items').text(nf.Common.formatInteger(inputPortsGrid.getData().getLength()));
$('#total-items').text(nf.Common.formatInteger(inputPortsGrid.getData().getLength()));
}
// update the combo for connections
// update the combo for input ports
$('#summary-filter-type').combo({
options: [{
text: 'by name',
@ -211,12 +215,12 @@ nf.SummaryTable = (function () {
if (nf.Common.isDefinedAndNotNull(outputPortsGrid)) {
outputPortsGrid.resizeCanvas();
// update the total number of connections
// update the total number of output ports
$('#displayed-items').text(nf.Common.formatInteger(outputPortsGrid.getData().getLength()));
$('#total-items').text(nf.Common.formatInteger(outputPortsGrid.getData().getLength()));
}
// update the combo for connections
// update the combo for output ports
$('#summary-filter-type').combo({
options: [{
text: 'by name',
@ -226,18 +230,18 @@ nf.SummaryTable = (function () {
applyFilter();
}
});
} else {
} else if (tab === 'Remote Process Groups') {
// ensure the connection table is size properly
var remoteProcessGroupsGrid = $('#remote-process-group-summary-table').data('gridInstance');
if (nf.Common.isDefinedAndNotNull(remoteProcessGroupsGrid)) {
remoteProcessGroupsGrid.resizeCanvas();
// update the total number of connections
// update the total number of remote process groups
$('#displayed-items').text(nf.Common.formatInteger(remoteProcessGroupsGrid.getData().getLength()));
$('#total-items').text(nf.Common.formatInteger(remoteProcessGroupsGrid.getData().getLength()));
}
// update the combo for connections
// update the combo for remote process groups
$('#summary-filter-type').combo({
options: [{
text: 'by name',
@ -250,6 +254,27 @@ nf.SummaryTable = (function () {
applyFilter();
}
});
} else {
// ensure the connection table is size properly
var processGroupGrid = $('#process-group-summary-table').data('gridInstance');
if (nf.Common.isDefinedAndNotNull(processGroupGrid)) {
processGroupGrid.resizeCanvas();
// update the total number of process groups
$('#displayed-items').text(nf.Common.formatInteger(processGroupGrid.getData().getLength()));
$('#total-items').text(nf.Common.formatInteger(processGroupGrid.getData().getLength()));
}
// update the combo for process groups
$('#summary-filter-type').combo({
options: [{
text: 'by name',
value: 'name'
}],
select: function (option) {
applyFilter();
}
});
}
// reset the filter
@ -458,18 +483,18 @@ nf.SummaryTable = (function () {
// show the tooltip
if (nf.Common.isDefinedAndNotNull(tooltip)) {
bulletinIcon.qtip($.extend({
bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
content: tooltip,
position: {
target: 'mouse',
viewport: $(window),
container: $('#summary'),
at: 'bottom right',
my: 'top left',
adjust: {
x: 8,
y: 8,
method: 'flipinvert flipinvert'
x: 4,
y: 4
}
}
}, nf.Common.config.tooltipConfig));
}));
}
}
});
@ -783,7 +808,7 @@ nf.SummaryTable = (function () {
// hold onto an instance of the grid
$('#cluster-connection-summary-table').data('gridInstance', clusterConnectionsGrid);
// define a custom formatter for showing more port details
// define a custom formatter for showing more port/group details
var moreDetails = function (row, cell, value, columnDef, dataContext) {
var markup = '';
@ -794,10 +819,260 @@ nf.SummaryTable = (function () {
return markup;
};
var moreDetailsColumn = {id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50};
var transferredColumn = {id: 'transferred', field: 'transferred', name: '<span class="transferred-title">Transferred</span>&nbsp;/&nbsp;<span class="transferred-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size transferred to and from connections in the last 5 min', resizable: true, defaultSortAsc: false, sortable: true};
var sentColumn = {id: 'sent', field: 'sent', name: '<span class="sent-title">Sent</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
var receivedColumn = {id: 'received', field: 'received', name: '<span class="received-title">Received</span>&nbsp;/&nbsp;<span class="received-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
// define the column model for the summary table
var processGroupsColumnModel = [
moreDetailsColumn,
{id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true, formatter: valueFormatter},
transferredColumn,
inputColumn,
ioColumn,
outputColumn,
sentColumn,
receivedColumn
];
// add an action column if appropriate
if (isClustered || isInShell || nf.Common.SUPPORTS_SVG) {
// define how the column is formatted
var processGroupActionFormatter = function (row, cell, value, columnDef, dataContext) {
var markup = '';
if (isInShell && dataContext.groupId !== null) {
markup += '<img src="images/iconGoTo.png" title="Go To" class="pointer go-to" style="margin-top: 2px;"/>&nbsp;';
}
if (nf.Common.SUPPORTS_SVG) {
if (isClustered) {
markup += '<img src="images/iconChart.png" title="Show History" class="pointer show-cluster-process-group-status-history" style="margin-top: 2px;"/>&nbsp;';
} else {
markup += '<img src="images/iconChart.png" title="Show History" class="pointer show-process-group-status-history" style="margin-top: 2px;"/>&nbsp;';
}
}
if (isClustered) {
markup += '<img src="images/iconClusterSmall.png" title="Show Details" class="pointer show-cluster-process-group-summary" style="margin-top: 2px;"/>&nbsp;';
}
return markup;
};
// define the action column for clusters and within the shell
processGroupsColumnModel.push({id: 'actions', name: '&nbsp;', formatter: processGroupActionFormatter, resizable: false, sortable: false, width: 75, maxWidth: 75});
}
// initialize the templates table
var processGroupsOptions = {
forceFitColumns: true,
enableTextSelectionOnCells: true,
enableCellNavigation: true,
enableColumnReorder: false,
autoEdit: false,
multiSelect: false
};
// initialize the dataview
var processGroupsData = new Slick.Data.DataView({
inlineFilters: false
});
processGroupsData.setItems([]);
processGroupsData.setFilterArgs({
searchString: '',
property: 'name'
});
processGroupsData.setFilter(filter);
// initialize the sort
sort('process-group-summary-table', {
columnId: 'name',
sortAsc: true
}, processGroupsData);
// initialize the grid
var processGroupsGrid = new Slick.Grid('#process-group-summary-table', processGroupsData, processGroupsColumnModel, processGroupsOptions);
processGroupsGrid.setSelectionModel(new Slick.RowSelectionModel());
processGroupsGrid.registerPlugin(new Slick.AutoTooltips());
processGroupsGrid.setSortColumn('name', true);
processGroupsGrid.onSort.subscribe(function (e, args) {
sort('process-group-summary-table', {
columnId: args.sortCol.field,
sortAsc: args.sortAsc
}, processGroupsData);
});
// configure a click listener
processGroupsGrid.onClick.subscribe(function (e, args) {
var target = $(e.target);
// get the node at this row
var item = processGroupsData.getItem(args.row);
// determine the desired action
if (processGroupsGrid.getColumns()[args.cell].id === 'actions') {
if (target.hasClass('go-to')) {
if (nf.Common.isDefinedAndNotNull(parent.nf) && nf.Common.isDefinedAndNotNull(parent.nf.CanvasUtils) && nf.Common.isDefinedAndNotNull(parent.nf.Shell)) {
parent.nf.CanvasUtils.enterGroup(item.id);
parent.$('#shell-close-button').click();
}
} else if (target.hasClass('show-cluster-process-group-status-history')) {
nf.StatusHistory.showClusterProcessGroupChart(item.groupId, item.id);
} else if (target.hasClass('show-process-group-status-history')) {
nf.StatusHistory.showStandaloneProcessGroupChart(item.groupId, item.id);
} else if (target.hasClass('show-cluster-process-group-summary')) {
// load the cluster processor summary
loadClusterProcessGroupSummary(item.id);
// hide the summary loading indicator
$('#summary-loading-container').hide();
// show the dialog
$('#cluster-process-group-summary-dialog').modal('show');
}
}
});
// wire up the dataview to the grid
processGroupsData.onRowCountChanged.subscribe(function (e, args) {
processGroupsGrid.updateRowCount();
processGroupsGrid.render();
// update the total number of displayed process groups if necessary
if ($('#process-group-summary-table').is(':visible')) {
$('#displayed-items').text(nf.Common.formatInteger(args.current));
}
});
processGroupsData.onRowsChanged.subscribe(function (e, args) {
processGroupsGrid.invalidateRows(args.rows);
processGroupsGrid.render();
});
// hold onto an instance of the grid
$('#process-group-summary-table').data('gridInstance', processGroupsGrid).on('mouseenter', 'div.slick-cell', function (e) {
var bulletinIcon = $(this).find('img.has-bulletins');
if (bulletinIcon.length && !bulletinIcon.data('qtip')) {
var processGroupId = $(this).find('span.row-id').text();
// get the status item
var item = processGroupsData.getItemById(processGroupId);
// format the tooltip
var bulletins = nf.Common.getFormattedBulletins(item.bulletins);
var tooltip = nf.Common.formatUnorderedList(bulletins);
// show the tooltip
if (nf.Common.isDefinedAndNotNull(tooltip)) {
bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
content: tooltip,
position: {
container: $('#summary'),
at: 'bottom right',
my: 'top left',
adjust: {
x: 4,
y: 4
}
}
}));
}
}
});
// initialize the cluster process group summary dialog
$('#cluster-process-group-summary-dialog').modal({
headerText: 'Cluster Process Group Summary',
overlayBackground: false,
buttons: [{
buttonText: 'Close',
handler: {
click: function () {
// clear the cluster processor summary dialog
$('#cluster-process-group-id').text('');
$('#cluster-process-group-name').text('');
// close the dialog
this.modal('hide');
}
}
}],
handler: {
close: function () {
// show the summary loading container
$('#summary-loading-container').show();
}
}
});
// cluster process group refresh
nf.Common.addHoverEffect('#cluster-process-group-refresh-button', 'button-refresh', 'button-refresh-hover').click(function () {
loadClusterProcessGroupSummary($('#cluster-process-group-id').text());
});
// initialize the cluster process groups column model
var clusterProcessGroupsColumnModel = [
{id: 'node', field: 'node', name: 'Node', sortable: true, resizable: true},
transferredColumn,
inputColumn,
ioColumn,
outputColumn,
sentColumn,
receivedColumn
];
// initialize the options for the cluster processors table
var clusterProcessGroupsOptions = {
forceFitColumns: true,
enableTextSelectionOnCells: true,
enableCellNavigation: true,
enableColumnReorder: false,
autoEdit: false,
multiSelect: false
};
// initialize the dataview
var clusterProcessGroupsData = new Slick.Data.DataView({
inlineFilters: false
});
clusterProcessGroupsData.setItems([]);
// initialize the sort
sort('cluster-processor-summary-table', {
columnId: 'node',
sortAsc: true
}, clusterProcessGroupsData);
// initialize the grid
var clusterProcessGroupsGrid = new Slick.Grid('#cluster-process-group-summary-table', clusterProcessGroupsData, clusterProcessGroupsColumnModel, clusterProcessGroupsOptions);
clusterProcessGroupsGrid.setSelectionModel(new Slick.RowSelectionModel());
clusterProcessGroupsGrid.registerPlugin(new Slick.AutoTooltips());
clusterProcessGroupsGrid.setSortColumn('node', true);
clusterProcessGroupsGrid.onSort.subscribe(function (e, args) {
sort('cluster-process-group-summary-table', {
columnId: args.sortCol.field,
sortAsc: args.sortAsc
}, clusterProcessGroupsData);
});
// wire up the dataview to the grid
clusterProcessGroupsData.onRowCountChanged.subscribe(function (e, args) {
clusterProcessGroupsGrid.updateRowCount();
clusterProcessGroupsGrid.render();
});
clusterProcessGroupsData.onRowsChanged.subscribe(function (e, args) {
clusterProcessGroupsGrid.invalidateRows(args.rows);
clusterProcessGroupsGrid.render();
});
// hold onto an instance of the grid
$('#cluster-process-group-summary-table').data('gridInstance', clusterProcessGroupsGrid);
// define the column model for the summary table
var inputPortsColumnModel = [
{id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50},
moreDetailsColumn,
nameColumn,
runStatusColumn,
outputColumn
@ -917,18 +1192,18 @@ nf.SummaryTable = (function () {
// show the tooltip
if (nf.Common.isDefinedAndNotNull(tooltip)) {
bulletinIcon.qtip($.extend({
bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
content: tooltip,
position: {
target: 'mouse',
viewport: $(window),
container: $('#summary'),
at: 'bottom right',
my: 'top left',
adjust: {
x: 8,
y: 8,
method: 'flipinvert flipinvert'
x: 4,
y: 4
}
}
}, nf.Common.config.tooltipConfig));
}));
}
}
});
@ -1019,7 +1294,7 @@ nf.SummaryTable = (function () {
// define the column model for the summary table
var outputPortsColumnModel = [
{id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50},
moreDetailsColumn,
nameColumn,
runStatusColumn,
inputColumn
@ -1139,18 +1414,18 @@ nf.SummaryTable = (function () {
// show the tooltip
if (nf.Common.isDefinedAndNotNull(tooltip)) {
bulletinIcon.qtip($.extend({
bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
content: tooltip,
position: {
target: 'mouse',
viewport: $(window),
container: $('#summary'),
at: 'bottom right',
my: 'top left',
adjust: {
x: 8,
y: 8,
method: 'flipinvert flipinvert'
x: 4,
y: 4
}
}
}, nf.Common.config.tooltipConfig));
}));
}
}
});
@ -1266,8 +1541,6 @@ nf.SummaryTable = (function () {
var transmissionStatusColumn = {id: 'transmissionStatus', field: 'transmissionStatus', name: 'Transmitting', formatter: transmissionStatusFormatter, sortable: true, resizable: true};
var targetUriColumn = {id: 'targetUri', field: 'targetUri', name: 'Target URI', sortable: true, resizable: true};
var sentColumn = {id: 'sent', field: 'sent', name: '<span class="sent-title">Sent</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
var receivedColumn = {id: 'received', field: 'received', name: '<span class="received-title">Received</span>&nbsp;/&nbsp;<span class="received-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
// define the column model for the summary table
var remoteProcessGroupsColumnModel = [
@ -1405,18 +1678,18 @@ nf.SummaryTable = (function () {
// show the tooltip
if (nf.Common.isDefinedAndNotNull(tooltip)) {
bulletinIcon.qtip($.extend({
bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
content: tooltip,
position: {
target: 'mouse',
viewport: $(window),
container: $('#summary'),
at: 'bottom right',
my: 'top left',
adjust: {
x: 8,
y: 8,
method: 'flipinvert flipinvert'
x: 4,
y: 4
}
}
}, nf.Common.config.tooltipConfig));
}));
}
}
});
@ -1607,7 +1880,7 @@ nf.SummaryTable = (function () {
var bQueueSize = nf.Common.parseSize(b['queuedSize']);
return aQueueSize - bQueueSize;
}
} else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output') {
} else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output' || sortDetails.columnId === 'transferred') {
var aSplit = a[sortDetails.columnId].split(/ \/ /);
var bSplit = b[sortDetails.columnId].split(/ \/ /);
var mod = sortState[tableId].count % 4;
@ -1670,6 +1943,8 @@ nf.SummaryTable = (function () {
$('#' + tableId + ' span.sent-size-title').removeClass('sorted');
$('#' + tableId + ' span.received-title').removeClass('sorted');
$('#' + tableId + ' span.received-size-title').removeClass('sorted');
$('#' + tableId + ' span.transferred-title').removeClass('sorted');
$('#' + tableId + ' span.transferred-size-title').removeClass('sorted');
// update/reset the count as appropriate
if (sortState[tableId].prevColumn !== sortDetails.columnId) {
@ -1809,12 +2084,13 @@ nf.SummaryTable = (function () {
*
* @argument {array} processorItems The processor data
* @argument {array} connectionItems The connection data
* @argument {array} processGroupItems The process group data
* @argument {array} inputPortItems The input port data
* @argument {array} outputPortItems The input port data
* @argument {array} remoteProcessGroupItems The remote process group data
* @argument {object} processGroupStatus The process group status
*/
var populateProcessGroupStatus = function (processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus) {
var populateProcessGroupStatus = function (processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus) {
// add the processors to the summary grid
$.each(processGroupStatus.processorStatus, function (i, procStatus) {
processorItems.push(procStatus);
@ -1839,10 +2115,13 @@ nf.SummaryTable = (function () {
$.each(processGroupStatus.remoteProcessGroupStatus, function (i, rpgStatus) {
remoteProcessGroupItems.push(rpgStatus);
});
// add the process group status as well
processGroupItems.push(processGroupStatus);
// add any child group's status
$.each(processGroupStatus.processGroupStatus, function (i, childProcessGroup) {
populateProcessGroupStatus(processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, childProcessGroup);
populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, childProcessGroup);
});
};
@ -1975,6 +2254,63 @@ nf.SummaryTable = (function () {
}).fail(nf.Common.handleAjaxError);
};
/**
* Loads the cluster input port details dialog for the specified processor.
*
* @argument {string} rowId The row id
*/
var loadClusterProcessGroupSummary = function (rowId) {
// get the summary
$.ajax({
type: 'GET',
url: config.urls.clusterProcessGroup + encodeURIComponent(rowId) + '/status',
data: {
verbose: true
},
dataType: 'json'
}).done(function (response) {
if (nf.Common.isDefinedAndNotNull(response.clusterProcessGroupStatus)) {
var clusterProcessGroupStatus = response.clusterProcessGroupStatus;
var clusterProcessGroupsGrid = $('#cluster-process-group-summary-table').data('gridInstance');
var clusterProcessGroupsData = clusterProcessGroupsGrid.getData();
var clusterProcessGroups = [];
// populate the table
$.each(clusterProcessGroupStatus.nodeProcessGroupStatus, function (i, nodeProcessGroupStatus) {
clusterProcessGroups.push({
id: nodeProcessGroupStatus.node.nodeId,
node: nodeProcessGroupStatus.node.address + ':' + nodeProcessGroupStatus.node.apiPort,
activeThreadCount: nodeProcessGroupStatus.processGroupStatus.activeThreadCount,
transferred: nodeProcessGroupStatus.processGroupStatus.transferred,
input: nodeProcessGroupStatus.processGroupStatus.input,
queued: nodeProcessGroupStatus.processGroupStatus.queued,
queuedCount: nodeProcessGroupStatus.processGroupStatus.queuedCount,
queuedSize: nodeProcessGroupStatus.processGroupStatus.queuedSize,
output: nodeProcessGroupStatus.processGroupStatus.output,
read: nodeProcessGroupStatus.processGroupStatus.read,
written: nodeProcessGroupStatus.processGroupStatus.written,
sent: nodeProcessGroupStatus.processGroupStatus.sent,
received: nodeProcessGroupStatus.processGroupStatus.received
});
});
// update the input ports
clusterProcessGroupsData.setItems(clusterProcessGroups);
clusterProcessGroupsData.reSort();
clusterProcessGroupsGrid.invalidate();
// populate the input port details
$('#cluster-process-group-name').text(clusterProcessGroupStatus.processGroupName).ellipsis();
$('#cluster-process-group-id').text(clusterProcessGroupStatus.processGroupId);
// update the stats last refreshed timestamp
$('#cluster-process-group-summary-last-refreshed').text(clusterProcessGroupStatus.statsLastRefreshed);
}
}).fail(nf.Common.handleAjaxError);
};
/**
* Loads the cluster input port details dialog for the specified processor.
*
@ -2126,14 +2462,17 @@ nf.SummaryTable = (function () {
};
return {
/**
* URL for loading system diagnostics.
*/
systemDiagnosticsUrl: null,
/**
* URL for loading the summary.
*/
url: null,
/**
* Initializes the status table.
*
@ -2157,6 +2496,7 @@ nf.SummaryTable = (function () {
});
}).promise();
},
/**
* Update the size of the grid based on its container's current size.
*/
@ -2170,6 +2510,11 @@ nf.SummaryTable = (function () {
if (nf.Common.isDefinedAndNotNull(connectionsGrid)) {
connectionsGrid.resizeCanvas();
}
var processGroupsGrid = $('#process-group-summary-table').data('gridInstance');
if (nf.Common.isDefinedAndNotNull(processGroupsGrid)) {
processGroupsGrid.resizeCanvas();
}
var inputPortGrid = $('#input-port-summary-table').data('gridInstance');
if (nf.Common.isDefinedAndNotNull(connectionsGrid)) {
@ -2186,6 +2531,7 @@ nf.SummaryTable = (function () {
remoteProcessGroupGrid.resizeCanvas();
}
},
/**
* Load the processor status table.
*/
@ -2212,6 +2558,14 @@ nf.SummaryTable = (function () {
// get the connections grid/data (do not render bulletins)
var connectionsGrid = $('#connection-summary-table').data('gridInstance');
var connectionsData = connectionsGrid.getData();
// remove any tooltips from the process group table
var processGroupGridElement = $('#process-group-summary-table');
nf.Common.cleanUpTooltips(processGroupGridElement, 'img.has-bulletins');
// get the process group grid/data
var processGroupGrid = processGroupGridElement.data('gridInstance');
var processGroupData = processGroupGrid.getData();
// remove any tooltips from the input port table
var inputPortsGridElement = $('#input-port-summary-table');
@ -2239,12 +2593,13 @@ nf.SummaryTable = (function () {
var processorItems = [];
var connectionItems = [];
var processGroupItems = [];
var inputPortItems = [];
var outputPortItems = [];
var remoteProcessGroupItems = [];
// populate the tables
populateProcessGroupStatus(processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus);
populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus);
// update the processors
processorsData.setItems(processorItems);
@ -2255,6 +2610,11 @@ nf.SummaryTable = (function () {
connectionsData.setItems(connectionItems);
connectionsData.reSort();
connectionsGrid.invalidate();
// update the process groups
processGroupData.setItems(processGroupItems);
processGroupData.reSort();
processGroupGrid.invalidate();
// update the input ports
inputPortsData.setItems(inputPortItems);