NIFI-1947 NIFI-2082:

- Making it more obviously when a nodes cluster state changes.
- Showing which node is the cluster coordinator.

This closes #651
This commit is contained in:
Matt Gilman 2016-07-14 11:00:14 -04:00 committed by Mark Payne
parent 0ce352d203
commit 3373e18158
17 changed files with 349 additions and 120 deletions

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import com.wordnik.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlType;
/**
* Details for the controller configuration.
*/
@XmlType(name = "clusterConfiguration")
public class ClusterSummaryDTO {
private Boolean isClustered;
private Boolean isConnectedToCluster;
private String connectedNodes;
private Integer connectedNodeCount = 0;
private Integer totalNodeCount = 0;
/**
* @return whether this NiFi instance is clustered
*/
@ApiModelProperty(
value = "Whether this NiFi instance is clustered."
)
public Boolean getClustered() {
return isClustered;
}
public void setClustered(Boolean clustered) {
isClustered = clustered;
}
/**
* @return whether this NiFi instance is connected to a cluster
*/
@ApiModelProperty(
value = "Whether this NiFi instance is connected to a cluster."
)
public Boolean getConnectedToCluster() {
return isConnectedToCluster;
}
public void setConnectedToCluster(Boolean connectedToCluster) {
isConnectedToCluster = connectedToCluster;
}
@ApiModelProperty("The number of nodes that are currently connected to the cluster")
public Integer getConnectedNodeCount() {
return connectedNodeCount;
}
public void setConnectedNodeCount(Integer connectedNodeCount) {
this.connectedNodeCount = connectedNodeCount;
}
@ApiModelProperty("The number of nodes in the cluster, regardless of whether or not they are connected")
public Integer getTotalNodeCount() {
return totalNodeCount;
}
public void setTotalNodeCount(Integer totalNodeCount) {
this.totalNodeCount = totalNodeCount;
}
/**
* @return Used in clustering, will report the number of nodes connected vs
* the number of nodes in the cluster
*/
@ApiModelProperty("When clustered, reports the number of nodes connected vs the number of nodes in the cluster.")
public String getConnectedNodes() {
return connectedNodes;
}
public void setConnectedNodes(String connectedNodes) {
this.connectedNodes = connectedNodes;
}
}

View File

@ -33,7 +33,6 @@ public class FlowConfigurationDTO {
private Date currentTime; private Date currentTime;
private Integer timeOffset; private Integer timeOffset;
private Boolean isClustered;
/** /**
* @return interval in seconds between the automatic NiFi refresh requests. This value is read only * @return interval in seconds between the automatic NiFi refresh requests. This value is read only
@ -78,18 +77,4 @@ public class FlowConfigurationDTO {
public void setTimeOffset(Integer timeOffset) { public void setTimeOffset(Integer timeOffset) {
this.timeOffset = timeOffset; this.timeOffset = timeOffset;
} }
/**
* @return whether this NiFi instance is clustered
*/
@ApiModelProperty(
value = "Whether this NiFi instance is clustered."
)
public Boolean getClustered() {
return isClustered;
}
public void setClustered(Boolean clustered) {
isClustered = clustered;
}
} }

View File

@ -23,6 +23,7 @@ import javax.xml.bind.annotation.XmlType;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Details of a node within this NiFi. * Details of a node within this NiFi.
@ -36,7 +37,7 @@ public class NodeDTO {
private String status; private String status;
private Date heartbeat; private Date heartbeat;
private Date connectionRequested; private Date connectionRequested;
private Boolean primary; private Set<String> roles;
private Integer activeThreadCount; private Integer activeThreadCount;
private String queued; private String queued;
private List<NodeEventDTO> events; private List<NodeEventDTO> events;
@ -181,18 +182,18 @@ public class NodeDTO {
} }
/** /**
* @return whether this node is the primary node within the cluster * @return the roles of the node
*/ */
@ApiModelProperty( @ApiModelProperty(
value = "Whether the node is the primary node within the cluster.", value = "The roles of this node.",
readOnly = true readOnly = true
) )
public Boolean isPrimary() { public Set<String> getRoles() {
return primary; return roles;
} }
public void setPrimary(Boolean primary) { public void setRoles(Set<String> roles) {
this.primary = primary; this.roles = roles;
} }
/** /**

View File

@ -31,10 +31,6 @@ public class ControllerStatusDTO implements Cloneable {
private Integer flowFilesQueued = 0; private Integer flowFilesQueued = 0;
private Long bytesQueued = 0L; private Long bytesQueued = 0L;
private String connectedNodes;
private Integer connectedNodeCount = 0;
private Integer totalNodeCount = 0;
private Integer runningCount = 0; private Integer runningCount = 0;
private Integer stoppedCount = 0; private Integer stoppedCount = 0;
private Integer invalidCount = 0; private Integer invalidCount = 0;
@ -68,19 +64,6 @@ public class ControllerStatusDTO implements Cloneable {
this.queued = queued; this.queued = queued;
} }
/**
* @return Used in clustering, will report the number of nodes connected vs
* the number of nodes in the cluster
*/
@ApiModelProperty("When clustered, reports the number of nodes connected vs the number of nodes in the cluster.")
public String getConnectedNodes() {
return connectedNodes;
}
public void setConnectedNodes(String connectedNodes) {
this.connectedNodes = connectedNodes;
}
/** /**
* @return number of running components in this controller * @return number of running components in this controller
*/ */
@ -171,24 +154,6 @@ public class ControllerStatusDTO implements Cloneable {
this.bytesQueued = bytesQueued; this.bytesQueued = bytesQueued;
} }
@ApiModelProperty("The number of nodes that are currently connected to the cluster")
public Integer getConnectedNodeCount() {
return connectedNodeCount;
}
public void setConnectedNodeCount(Integer connectedNodeCount) {
this.connectedNodeCount = connectedNodeCount;
}
@ApiModelProperty("The number of nodes in the cluster, regardless of whether or not they are connected")
public Integer getTotalNodeCount() {
return totalNodeCount;
}
public void setTotalNodeCount(Integer totalNodeCount) {
this.totalNodeCount = totalNodeCount;
}
@Override @Override
public ControllerStatusDTO clone() { public ControllerStatusDTO clone() {
final ControllerStatusDTO other = new ControllerStatusDTO(); final ControllerStatusDTO other = new ControllerStatusDTO();
@ -196,9 +161,6 @@ public class ControllerStatusDTO implements Cloneable {
other.setQueued(getQueued()); other.setQueued(getQueued());
other.setFlowFilesQueued(getFlowFilesQueued()); other.setFlowFilesQueued(getFlowFilesQueued());
other.setBytesQueued(getBytesQueued()); other.setBytesQueued(getBytesQueued());
other.setConnectedNodes(getConnectedNodes());
other.setConnectedNodeCount(getConnectedNodeCount());
other.setTotalNodeCount(getTotalNodeCount());
other.setRunningCount(getRunningCount()); other.setRunningCount(getRunningCount());
other.setStoppedCount(getStoppedCount()); other.setStoppedCount(getStoppedCount());
other.setInvalidCount(getInvalidCount()); other.setInvalidCount(getInvalidCount());

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import org.apache.nifi.web.api.dto.ClusterSummaryDTO;
import javax.xml.bind.annotation.XmlRootElement;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ClusterSummaryDTO.
*/
@XmlRootElement(name = "controllerStatusEntity")
public class ClusteSummaryEntity extends Entity {
private ClusterSummaryDTO clusterSummary;
/**
* The ClusterSummaryDTO that is being serialized.
*
* @return The ClusterSummaryDTO object
*/
public ClusterSummaryDTO getClusterSummary() {
return clusterSummary;
}
public void setClusterSummary(ClusterSummaryDTO clusterSummary) {
this.clusterSummary = clusterSummary;
}
}

View File

@ -70,7 +70,6 @@ public class StatusMerger {
public static void updatePrettyPrintedFields(final ControllerStatusDTO target) { public static void updatePrettyPrintedFields(final ControllerStatusDTO target) {
target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
target.setConnectedNodes(formatCount(target.getConnectedNodeCount()) + " / " + formatCount(target.getTotalNodeCount()));
} }
public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {

View File

@ -2769,7 +2769,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// create node dtos // create node dtos
final Collection<NodeDTO> nodeDtos = new ArrayList<>(); final Collection<NodeDTO> nodeDtos = new ArrayList<>();
clusterDto.setNodes(nodeDtos); clusterDto.setNodes(nodeDtos);
final NodeIdentifier primaryNode = clusterCoordinator.getPrimaryNode();
for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) { for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status == null) { if (status == null) {
@ -2777,9 +2776,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
} }
final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId); final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
final boolean primary = primaryNode != null && primaryNode.equals(nodeId); final Set<String> nodeRoles = clusterCoordinator.getConnectionStatus(nodeId).getRoles();
final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId); final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
nodeDtos.add(dtoFactory.createNodeDTO(nodeId, status, heartbeat, events, primary)); nodeDtos.add(dtoFactory.createNodeDTO(nodeId, status, heartbeat, events, nodeRoles));
} }
return clusterDto; return clusterDto;
@ -2794,9 +2793,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private NodeDTO getNode(final NodeIdentifier nodeId) { private NodeDTO getNode(final NodeIdentifier nodeId) {
final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId); final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId);
final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId); final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
final boolean primary = nodeId.equals(clusterCoordinator.getPrimaryNode()); final Set<String> roles = clusterCoordinator.getConnectionStatus(nodeId).getRoles();
final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId); final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, primary); return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles);
} }
@Override @Override

View File

@ -694,7 +694,11 @@ public abstract class ApplicationResource {
* if running in standalone mode or disconnected from cluster * if running in standalone mode or disconnected from cluster
*/ */
boolean isConnectedToCluster() { boolean isConnectedToCluster() {
return clusterCoordinator != null && clusterCoordinator.isConnected(); return isClustered() && clusterCoordinator.isConnected();
}
boolean isClustered () {
return clusterCoordinator != null;
} }
public void setRequestReplicator(final RequestReplicator requestReplicator) { public void setRequestReplicator(final RequestReplicator requestReplicator) {

View File

@ -36,8 +36,10 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
@ -48,6 +50,7 @@ import org.apache.nifi.web.api.dto.AboutDTO;
import org.apache.nifi.web.api.dto.BannerDTO; import org.apache.nifi.web.api.dto.BannerDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterSummaryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO; import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@ -72,6 +75,7 @@ import org.apache.nifi.web.api.entity.AboutEntity;
import org.apache.nifi.web.api.entity.ActionEntity; import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.BannerEntity; import org.apache.nifi.web.api.entity.BannerEntity;
import org.apache.nifi.web.api.entity.BulletinBoardEntity; import org.apache.nifi.web.api.entity.BulletinBoardEntity;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity; import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
import org.apache.nifi.web.api.entity.ComponentHistoryEntity; import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
@ -81,7 +85,6 @@ import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ControllerStatusEntity; import org.apache.nifi.web.api.entity.ControllerStatusEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity; import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.HistoryEntity; import org.apache.nifi.web.api.entity.HistoryEntity;
import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.entity.PortStatusEntity;
@ -322,15 +325,7 @@ public class FlowResource extends ApplicationResource {
authorizeFlow(); authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final FlowConfigurationEntity entity = serviceFacade.getFlowConfiguration(); final FlowConfigurationEntity entity = serviceFacade.getFlowConfiguration();
// include details about cluster state
entity.getFlowConfiguration().setClustered(isConnectedToCluster());
return clusterContext(generateOkResponse(entity)).build(); return clusterContext(generateOkResponse(entity)).build();
} }
@ -778,7 +773,7 @@ public class FlowResource extends ApplicationResource {
// TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@ApiOperation( @ApiOperation(
value = "Gets the current status of this NiFi", value = "Gets the current status of this NiFi",
response = Entity.class, response = ControllerStatusEntity.class,
authorizations = { authorizations = {
@Authorization(value = "Read Only", type = "ROLE_MONITOR"), @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
@ -811,6 +806,66 @@ public class FlowResource extends ApplicationResource {
return clusterContext(generateOkResponse(entity)).build(); return clusterContext(generateOkResponse(entity)).build();
} }
/**
* Retrieves the status for this NiFi.
*
* @return A controllerStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster/summary")
// TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@ApiOperation(
value = "Gets the current status of this NiFi",
response = ControllerStatusEntity.class,
authorizations = {
@Authorization(value = "Read Only", type = "ROLE_MONITOR"),
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
@Authorization(value = "Administrator", type = "ROLE_ADMIN")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getClusterSummary() throws InterruptedException {
authorizeFlow();
final ClusterSummaryDTO clusterConfiguration = new ClusterSummaryDTO();
final ClusterCoordinator clusterCoordinator = getClusterCoordinator();
if (clusterCoordinator != null && clusterCoordinator.isConnected()) {
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
int totalNodeCount = 0;
for (final List<NodeIdentifier> nodeList : stateMap.values()) {
totalNodeCount += nodeList.size();
}
final List<NodeIdentifier> connectedNodeIds = stateMap.get(NodeConnectionState.CONNECTED);
final int connectedNodeCount = (connectedNodeIds == null) ? 0 : connectedNodeIds.size();
clusterConfiguration.setConnectedNodeCount(connectedNodeCount);
clusterConfiguration.setTotalNodeCount(totalNodeCount);
clusterConfiguration.setConnectedNodes(connectedNodeCount + " / " + totalNodeCount);
}
clusterConfiguration.setClustered(isClustered());
clusterConfiguration.setConnectedToCluster(isConnectedToCluster());
// create the response entity
final ClusteSummaryEntity entity = new ClusteSummaryEntity();
entity.setClusterSummary(clusterConfiguration);
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
/** /**
* Retrieves the controller level bulletins. * Retrieves the controller level bulletins.
* *

View File

@ -2943,7 +2943,7 @@ public final class DtoFactory {
return dto; return dto;
} }
public NodeDTO createNodeDTO(final NodeIdentifier nodeId, final NodeConnectionStatus status, final NodeHeartbeat nodeHeartbeat, final List<NodeEvent> events, final boolean primary) { public NodeDTO createNodeDTO(final NodeIdentifier nodeId, final NodeConnectionStatus status, final NodeHeartbeat nodeHeartbeat, final List<NodeEvent> events, final Set<String> roles) {
final NodeDTO nodeDto = new NodeDTO(); final NodeDTO nodeDto = new NodeDTO();
// populate node dto // populate node dto
@ -2951,7 +2951,7 @@ public final class DtoFactory {
nodeDto.setAddress(nodeId.getApiAddress()); nodeDto.setAddress(nodeId.getApiAddress());
nodeDto.setApiPort(nodeId.getApiPort()); nodeDto.setApiPort(nodeId.getApiPort());
nodeDto.setStatus(status.getState().name()); nodeDto.setStatus(status.getState().name());
nodeDto.setPrimary(primary); nodeDto.setRoles(roles);
if (status.getConnectionRequestTime() != null) { if (status.getConnectionRequestTime() != null) {
final Date connectionRequested = new Date(status.getConnectionRequestTime()); final Date connectionRequested = new Date(status.getConnectionRequestTime());
nodeDto.setConnectionRequested(connectionRequested); nodeDto.setConnectionRequested(connectionRequested);

View File

@ -32,7 +32,6 @@ import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
@ -495,20 +494,6 @@ public class ControllerFacade implements Authorizable {
controllerStatus.setBytesQueued(controllerQueueSize.getByteCount()); controllerStatus.setBytesQueued(controllerQueueSize.getByteCount());
controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount()); controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount());
if (clusterCoordinator != null && clusterCoordinator.isConnected()) {
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
int totalNodeCount = 0;
for (final List<NodeIdentifier> nodeList : stateMap.values()) {
totalNodeCount += nodeList.size();
}
final List<NodeIdentifier> connectedNodeIds = stateMap.get(NodeConnectionState.CONNECTED);
final int connectedNodeCount = (connectedNodeIds == null) ? 0 : connectedNodeIds.size();
controllerStatus.setConnectedNodeCount(connectedNodeCount);
controllerStatus.setTotalNodeCount(totalNodeCount);
controllerStatus.setConnectedNodes(connectedNodeCount + " / " + totalNodeCount);
}
final ProcessGroupCounts counts = rootGroup.getCounts(); final ProcessGroupCounts counts = rootGroup.getCounts();
controllerStatus.setRunningCount(counts.getRunningCount()); controllerStatus.setRunningCount(counts.getRunningCount());
controllerStatus.setStoppedCount(counts.getStoppedCount()); controllerStatus.setStoppedCount(counts.getStoppedCount());

View File

@ -111,6 +111,10 @@
background-color: transparent; background-color: transparent;
} }
#connected-nodes-count.connection-warning {
color: #BA554A;
}
/* search field */ /* search field */
div.search-result-icon { div.search-result-icon {
float: left; float: left;

View File

@ -337,6 +337,43 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) {
}).fail(nf.Common.handleAjaxError); }).fail(nf.Common.handleAjaxError);
}, },
/**
* Updates the cluster summary.
*
* @param clusterSummary
*/
updateClusterSummary: function (clusterSummary) {
// see if this node has been (dis)connected
if (nf.Canvas.isConnectedToCluster() !== clusterSummary.connectedToCluster) {
if (clusterSummary.connectedToCluster) {
nf.Canvas.showConnectedToClusterMessage();
} else {
nf.Canvas.showDisconnectedFromClusterMessage();
}
}
var color = '#728E9B';
// update the connection state
if (clusterSummary.connectedToCluster) {
if (nf.Common.isDefinedAndNotNull(clusterSummary.connectedNodes)) {
var connectedNodes = clusterSummary.connectedNodes.split(' / ');
if (connectedNodes.length === 2 && connectedNodes[0] !== connectedNodes[1]) {
this.clusterConnectionWarning = true;
color = '#BA554A';
}
}
this.connectedNodesCount =
nf.Common.isDefinedAndNotNull(clusterSummary.connectedNodes) ? $sanitize(clusterSummary.connectedNodes) : '-';
} else {
this.connectedNodesCount = 'Disconnected';
color = '#BA554A';
}
// update the color
$('#connected-nodes-count').closest('div.fa-cubes').css('color', color);
},
/** /**
* Update the flow status counts. * Update the flow status counts.
* *
@ -348,13 +385,6 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) {
'#BA554A' : '#728E9B'; '#BA554A' : '#728E9B';
$('#controller-invalid-count').parent().css('color', controllerInvalidCountColor); $('#controller-invalid-count').parent().css('color', controllerInvalidCountColor);
if (nf.Common.isDefinedAndNotNull(status.connectedNodes)) {
var connectedNodes = status.connectedNodes.split(' / ');
var connectedNodesCountColor =
(connectedNodes.length === 2 && connectedNodes[0] !== connectedNodes[1]) ? '#BA554A' : '#728E9B';
$('#connected-nodes-count').parent().css('color', connectedNodesCountColor);
}
// update the report values // update the report values
this.activeThreadCount = $sanitize(status.activeThreadCount); this.activeThreadCount = $sanitize(status.activeThreadCount);
this.totalQueued = $sanitize(status.queued); this.totalQueued = $sanitize(status.queued);
@ -380,9 +410,6 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) {
this.controllerDisabledCount = this.controllerDisabledCount =
nf.Common.isDefinedAndNotNull(status.disabledCount) ? $sanitize(status.disabledCount) : '-'; nf.Common.isDefinedAndNotNull(status.disabledCount) ? $sanitize(status.disabledCount) : '-';
this.connectedNodesCount =
nf.Common.isDefinedAndNotNull(status.connectedNodes) ? $sanitize(status.connectedNodes) : '-';
}, },
/** /**

View File

@ -141,7 +141,7 @@ nf.ng.Canvas.GlobalMenuCtrl = function (serviceProvider) {
* @returns {*|boolean} * @returns {*|boolean}
*/ */
visible: function () { visible: function () {
return nf.Canvas.isClustered(); return nf.Canvas.isConnectedToCluster();
}, },
/** /**

View File

@ -117,6 +117,7 @@ nf.Canvas = (function () {
var permissions = null; var permissions = null;
var parentGroupId = null; var parentGroupId = null;
var clustered = false; var clustered = false;
var connectedToCluster = false;
var svg = null; var svg = null;
var canvas = null; var canvas = null;
@ -132,6 +133,7 @@ nf.Canvas = (function () {
revision: '../nifi-api/flow/revision', revision: '../nifi-api/flow/revision',
banners: '../nifi-api/flow/banners', banners: '../nifi-api/flow/banners',
flowConfig: '../nifi-api/flow/config', flowConfig: '../nifi-api/flow/config',
clusterSummary: '../nifi-api/flow/cluster/summary',
cluster: '../nifi-api/controller/cluster' cluster: '../nifi-api/controller/cluster'
} }
}; };
@ -686,6 +688,19 @@ nf.Canvas = (function () {
}); });
}; };
/**
* Loads the flow configuration and updated the cluster state.
*
* @returns xhr
*/
var loadClusterSummary = function () {
return $.ajax({
type: 'GET',
url: config.urls.clusterSummary,
dataType: 'json'
});
};
return { return {
CANVAS_OFFSET: 0, CANVAS_OFFSET: 0,
@ -718,7 +733,7 @@ nf.Canvas = (function () {
// hide the context menu // hide the context menu
nf.ContextMenu.hide(); nf.ContextMenu.hide();
// get the process group to refresh everything // issue the requests
var processGroupXhr = reloadProcessGroup(nf.Canvas.getGroupId(), options); var processGroupXhr = reloadProcessGroup(nf.Canvas.getGroupId(), options);
var statusXhr = nf.ng.Bridge.injector.get('flowStatusCtrl').reloadFlowStatus(); var statusXhr = nf.ng.Bridge.injector.get('flowStatusCtrl').reloadFlowStatus();
var currentUserXhr = loadCurrentUser(); var currentUserXhr = loadCurrentUser();
@ -732,9 +747,19 @@ nf.Canvas = (function () {
}).fail(function (xhr, status, error) { }).fail(function (xhr, status, error) {
deferred.reject(xhr, status, error); deferred.reject(xhr, status, error);
}); });
var clusterSummary = loadClusterSummary().done(function (response) {
var clusterSummary = response.clusterSummary;
// update the cluster summary
nf.ng.Bridge.injector.get('flowStatusCtrl').updateClusterSummary(clusterSummary);
// update the clustered flag
clustered = clusterSummary.clustered;
connectedToCluster = clusterSummary.connectedToCluster;
});
// wait for all requests to complete // wait for all requests to complete
$.when(processGroupXhr, statusXhr, currentUserXhr, controllerBulletins).done(function (processGroupResult) { $.when(processGroupXhr, statusXhr, currentUserXhr, controllerBulletins, clusterSummary).done(function (processGroupResult) {
// inform Angular app values have changed // inform Angular app values have changed
nf.ng.Bridge.digest(); nf.ng.Bridge.digest();
@ -746,6 +771,26 @@ nf.Canvas = (function () {
}).promise(); }).promise();
}, },
/**
* Shows a message when disconnected from the cluster.
*/
showDisconnectedFromClusterMessage: function () {
nf.Dialog.showOkDialog({
headerText: 'Cluster Connection',
dialogContent: 'This node is currently not connected to the cluster. Any modifications to the data flow made here will not replicate across the cluster.'
});
},
/**
* Shows a message when connected to the cluster.
*/
showConnectedToClusterMessage: function () {
nf.Dialog.showOkDialog({
headerText: 'Cluster Connection',
dialogContent: 'This node just joined the cluster. Any modifications to the data flow made here will replicate across the cluster.'
});
},
/** /**
* Initialize NiFi. * Initialize NiFi.
*/ */
@ -811,9 +856,13 @@ nf.Canvas = (function () {
dataType: 'json' dataType: 'json'
}); });
// get the initial cluster summary
var clusterSummary = loadClusterSummary();
// ensure the config requests are loaded // ensure the config requests are loaded
$.when(configXhr, userXhr, clientXhr).done(function (configResult, loginResult, aboutResult) { $.when(configXhr, clusterSummary, userXhr, clientXhr).done(function (configResult, clusterSummaryResult) {
var configResponse = configResult[0]; var configResponse = configResult[0];
var clusterSummaryResponse = clusterSummaryResult[0];
// calculate the canvas offset // calculate the canvas offset
var canvasContainer = $('#canvas-container'); var canvasContainer = $('#canvas-container');
@ -821,9 +870,19 @@ nf.Canvas = (function () {
// get the config details // get the config details
var configDetails = configResponse.flowConfiguration; var configDetails = configResponse.flowConfiguration;
var clusterSummary = clusterSummaryResponse.clusterSummary;
// update the clustered flag // show disconnected message on load if necessary
clustered = configDetails.clustered; if (clusterSummary.connectedToCluster === false) {
nf.Canvas.showDisconnectedFromClusterMessage();
}
// establish the initial cluster state
clustered = clusterSummary.clustered;
connectedToCluster = clusterSummary.connectedToCluster;
// update the cluster summary
nf.ng.Bridge.injector.get('flowStatusCtrl').updateClusterSummary(clusterSummary);
// get the auto refresh interval // get the auto refresh interval
var autoRefreshIntervalSeconds = parseInt(configDetails.autoRefreshIntervalSeconds, 10); var autoRefreshIntervalSeconds = parseInt(configDetails.autoRefreshIntervalSeconds, 10);
@ -895,6 +954,15 @@ nf.Canvas = (function () {
return clustered === true; return clustered === true;
}, },
/**
* Return whether this instance is connected to a cluster.
*
* @returns {boolean}
*/
isConnectedToCluster: function () {
return connectedToCluster === true;
},
/** /**
* Set the group id. * Set the group id.
* *

View File

@ -23,6 +23,8 @@ nf.ClusterTable = (function () {
* Configuration object used to hold a number of configuration items. * Configuration object used to hold a number of configuration items.
*/ */
var config = { var config = {
primaryNode: 'Primary Node',
clusterCoorindator: 'Cluster Coordinator',
filterText: 'Filter', filterText: 'Filter',
styles: { styles: {
filterList: 'cluster-filter-list' filterList: 'cluster-filter-list'
@ -65,11 +67,11 @@ nf.ClusterTable = (function () {
} }
} else if (sortDetails.columnId === 'status') { } else if (sortDetails.columnId === 'status') {
var aString = nf.Common.isDefinedAndNotNull(a[sortDetails.columnId]) ? a[sortDetails.columnId] : ''; var aString = nf.Common.isDefinedAndNotNull(a[sortDetails.columnId]) ? a[sortDetails.columnId] : '';
if (a.primary === true) { if (a.roles.includes(config.primaryNode)) {
aString += ', PRIMARY'; aString += ', PRIMARY';
} }
var bString = nf.Common.isDefinedAndNotNull(b[sortDetails.columnId]) ? b[sortDetails.columnId] : ''; var bString = nf.Common.isDefinedAndNotNull(b[sortDetails.columnId]) ? b[sortDetails.columnId] : '';
if (b.primary === true) { if (b.roles.includes(config.primaryNode)) {
bString += ', PRIMARY'; bString += ', PRIMARY';
} }
return aString === bString ? 0 : aString > bString ? 1 : -1; return aString === bString ? 0 : aString > bString ? 1 : -1;
@ -409,11 +411,14 @@ nf.ClusterTable = (function () {
// define a custom formatter for the status column // define a custom formatter for the status column
var statusFormatter = function (row, cell, value, columnDef, dataContext) { var statusFormatter = function (row, cell, value, columnDef, dataContext) {
if (dataContext.primary === true) { var markup = value;
return value + ', PRIMARY'; if (dataContext.roles.includes(config.primaryNode)) {
} else { value += ', PRIMARY';
return value;
} }
if (dataContext.roles.includes(config.clusterCoorindator)) {
value += ', COORDINATOR';
}
return value;
}; };
var columnModel = [ var columnModel = [
@ -433,9 +438,6 @@ nf.ClusterTable = (function () {
var canDisconnect = false; var canDisconnect = false;
var canConnect = false; var canConnect = false;
// determine if this node is already the primary
var isPrimary = dataContext.primary;
// determine the current status // determine the current status
if (dataContext.status === 'CONNECTED' || dataContext.status === 'CONNECTING') { if (dataContext.status === 'CONNECTED' || dataContext.status === 'CONNECTING') {
canDisconnect = true; canDisconnect = true;

View File

@ -426,7 +426,7 @@ nf.Common = (function () {
} }
// status code 400, 403, 404, and 409 are expected response codes for common errors. // status code 400, 403, 404, and 409 are expected response codes for common errors.
if (xhr.status === 400 || xhr.status === 403 || xhr.status === 404 || xhr.status === 409) { if (xhr.status === 400 || xhr.status === 403 || xhr.status === 404 || xhr.status === 409 || xhr.status === 503) {
nf.Dialog.showOkDialog({ nf.Dialog.showOkDialog({
headerText: 'Error', headerText: 'Error',
dialogContent: nf.Common.escapeHtml(xhr.responseText) dialogContent: nf.Common.escapeHtml(xhr.responseText)