diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 2c7f55b5ae..7cd0e30393 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -267,6 +267,8 @@ public interface FlowFileQueue { void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String partitioningAttribute); + void offloadQueue(); + LoadBalanceStrategy getLoadBalanceStrategy(); void setLoadBalanceCompression(LoadBalanceCompression compression); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 11786c27c0..2ad0e70971 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.coordination; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -61,6 +62,23 @@ public interface ClusterCoordinator { */ void finishNodeConnection(NodeIdentifier nodeId); + /** + * Indicates that the node has finished being offloaded + * + * @param nodeId the identifier of the node + */ + void finishNodeOffload(NodeIdentifier nodeId); + + /** + * Sends a request to the node to be offloaded. + * The node will be marked as offloading immediately. + * + * @param nodeId the identifier of the node + * @param offloadCode the code that represents why this node is being asked to be offloaded + * @param explanation an explanation as to why the node is being asked to be offloaded + */ + void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation); + /** * Sends a request to the node to disconnect from the cluster. * The node will be marked as disconnected immediately. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java index 54cc4de117..ad9be3d738 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java @@ -23,6 +23,8 @@ public interface ClusterTopologyEventListener { void onNodeAdded(NodeIdentifier nodeId); + void onNodeOffloaded(NodeIdentifier nodeId); + void onNodeRemoved(NodeIdentifier nodeId); void onLocalNodeIdentifierSet(NodeIdentifier localNodeId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java index 8d5824f171..d79552c8cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java @@ -36,12 +36,22 @@ public enum NodeConnectionState { */ CONNECTED, + /** + * A node that is in the process of offloading its flow files from the node. + */ + OFFLOADING, + /** * A node that is in the process of disconnecting from the cluster. * A DISCONNECTING node will always transition to DISCONNECTED. */ DISCONNECTING, + /** + * A node that has offloaded its flow files from the node. + */ + OFFLOADED, + /** * A node that is not connected to the cluster. * A DISCONNECTED node can transition to CONNECTING. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java index 34bd1279e3..7d8a94049c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java @@ -35,47 +35,53 @@ public class NodeConnectionStatus { private final long updateId; private final NodeIdentifier nodeId; private final NodeConnectionState state; + private final OffloadCode offloadCode; private final DisconnectionCode disconnectCode; - private final String disconnectReason; + private final String reason; private final Long connectionRequestTime; public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state) { - this(nodeId, state, null, null, null); + this(nodeId, state, null, null, null, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) { - this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.toString(), null); + this(nodeId, NodeConnectionState.DISCONNECTED, null, disconnectionCode, disconnectionCode.toString(), null); + } + + public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final OffloadCode offloadCode, final String offloadExplanation) { + this(nodeId, state, offloadCode, null, offloadExplanation, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String disconnectionExplanation) { - this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null); + this(nodeId, NodeConnectionState.DISCONNECTED, null, disconnectionCode, disconnectionExplanation, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectionCode) { - this(nodeId, state, disconnectionCode, disconnectionCode == null ? null : disconnectionCode.toString(), null); + this(nodeId, state, null, disconnectionCode, disconnectionCode == null ? null : disconnectionCode.toString(), null); } public NodeConnectionStatus(final NodeConnectionStatus status) { - this(status.getNodeIdentifier(), status.getState(), status.getDisconnectCode(), status.getDisconnectReason(), status.getConnectionRequestTime()); + this(status.getNodeIdentifier(), status.getState(), status.getOffloadCode(), status.getDisconnectCode(), status.getReason(), status.getConnectionRequestTime()); } - public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode, - final String disconnectReason, final Long connectionRequestTime) { - this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, disconnectReason, connectionRequestTime); + public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final OffloadCode offloadCode, + final DisconnectionCode disconnectCode, final String reason, final Long connectionRequestTime) { + this(idGenerator.getAndIncrement(), nodeId, state, offloadCode, disconnectCode, reason, connectionRequestTime); } - public NodeConnectionStatus(final long updateId, final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode, - final String disconnectReason, final Long connectionRequestTime) { + public NodeConnectionStatus(final long updateId, final NodeIdentifier nodeId, final NodeConnectionState state, final OffloadCode offloadCode, + final DisconnectionCode disconnectCode, final String reason, final Long connectionRequestTime) { this.updateId = updateId; this.nodeId = nodeId; this.state = state; + this.offloadCode = offloadCode; if (state == NodeConnectionState.DISCONNECTED && disconnectCode == null) { this.disconnectCode = DisconnectionCode.UNKNOWN; - this.disconnectReason = this.disconnectCode.toString(); + this.reason = this.disconnectCode.toString(); } else { this.disconnectCode = disconnectCode; - this.disconnectReason = disconnectReason; + this.reason = reason; } this.connectionRequestTime = (connectionRequestTime == null && state == NodeConnectionState.CONNECTING) ? Long.valueOf(System.currentTimeMillis()) : connectionRequestTime; @@ -93,12 +99,16 @@ public class NodeConnectionStatus { return state; } + public OffloadCode getOffloadCode() { + return offloadCode; + } + public DisconnectionCode getDisconnectCode() { return disconnectCode; } - public String getDisconnectReason() { - return disconnectReason; + public String getReason() { + return reason; } public Long getConnectionRequestTime() { @@ -110,8 +120,11 @@ public class NodeConnectionStatus { final StringBuilder sb = new StringBuilder(); final NodeConnectionState state = getState(); sb.append("NodeConnectionStatus[nodeId=").append(nodeId).append(", state=").append(state); + if (state == NodeConnectionState.OFFLOADED || state == NodeConnectionState.OFFLOADING) { + sb.append(", Offload Code=").append(getOffloadCode()).append(", Offload Reason=").append(getReason()); + } if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) { - sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason()); + sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getReason()); } sb.append(", updateId=").append(getUpdateIdentifier()); sb.append("]"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java new file mode 100644 index 0000000000..fb4d30bbc5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.node; + +/** + * An enumeration of the reasons that a node may be offloaded + */ +public enum OffloadCode { + + /** + * A user explicitly offloaded the node + */ + OFFLOADED("Node Offloaded"); + + private final String description; + + OffloadCode(final String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index 986231efd4..b5485ccd56 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol; import java.util.Set; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -40,6 +41,14 @@ public interface ClusterCoordinationProtocolSender { */ ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException; + /** + * Sends an "offload request" message to a node. + * + * @param msg a message + * @throws ProtocolException if communication failed + */ + void offload(OffloadMessage msg) throws ProtocolException; + /** * Sends a "disconnection request" message to a node. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java index ae3a0e5057..74cc6b476a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java @@ -26,6 +26,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -100,6 +101,11 @@ public class ClusterCoordinationProtocolSenderListener implements ClusterCoordin return sender.requestReconnection(msg); } + @Override + public void offload(OffloadMessage msg) throws ProtocolException { + sender.offload(msg); + } + @Override public void disconnect(DisconnectMessage msg) throws ProtocolException { sender.disconnect(msg); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index 9eaffd37b9..c588a6807d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -24,6 +24,7 @@ import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -210,6 +211,8 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi return ((ConnectionRequestMessage) message).getConnectionRequest().getProposedNodeIdentifier(); case HEARTBEAT: return ((HeartbeatMessage) message).getHeartbeat().getNodeIdentifier(); + case OFFLOAD_REQUEST: + return ((OffloadMessage) message).getNodeId(); case DISCONNECTION_REQUEST: return ((DisconnectMessage) message).getNodeId(); case FLOW_REQUEST: diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index 167ddec932..b21068ffe5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -36,6 +36,7 @@ import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; @@ -128,6 +129,31 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } } + /** + * Requests a node to be offloaded. The configured value for + * handshake timeout is applied to the socket before making the request. + * + * @param msg a message + * @throws ProtocolException if the message failed to be sent + */ + @Override + public void offload(final OffloadMessage msg) throws ProtocolException { + Socket socket = null; + try { + socket = createSocket(msg.getNodeId(), true); + + // marshal message to output stream + try { + final ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch (final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + /** * Requests a node to disconnect from the cluster. The configured value for * handshake timeout is applied to the socket before making the request. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java index c8c4acf646..5eae83e0e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.protocol.jaxb.message; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -25,8 +26,9 @@ public class AdaptedNodeConnectionStatus { private Long updateId; private NodeIdentifier nodeId; private NodeConnectionState state; + private OffloadCode offloadCode; private DisconnectionCode disconnectCode; - private String disconnectReason; + private String reason; private Long connectionRequestTime; public Long getUpdateId() { @@ -53,20 +55,28 @@ public class AdaptedNodeConnectionStatus { this.state = state; } + public OffloadCode getOffloadCode() { + return offloadCode; + } + public DisconnectionCode getDisconnectCode() { return disconnectCode; } + public void setOffloadCode(OffloadCode offloadCode) { + this.offloadCode = offloadCode; + } + public void setDisconnectCode(DisconnectionCode disconnectCode) { this.disconnectCode = disconnectCode; } - public String getDisconnectReason() { - return disconnectReason; + public String getReason() { + return reason; } - public void setDisconnectReason(String disconnectReason) { - this.disconnectReason = disconnectReason; + public void setReason(String reason) { + this.reason = reason; } public Long getConnectionRequestTime() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java index ec209de1f5..47e92e8d2a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java @@ -28,8 +28,9 @@ public class NodeConnectionStatusAdapter extends XmlAdapter offloaded = stateMap.get(NodeConnectionState.OFFLOADED); + if (offloaded != null && !offloaded.isEmpty()) { + if (offloaded.size() == 1) { + throw new OffloadedNodeMutableRequestException("Node " + offloaded.iterator().next() + " is currently offloaded"); + } else { + throw new OffloadedNodeMutableRequestException(offloaded.size() + " Nodes are currently offloaded"); + } + } + + final List offloading = stateMap.get(NodeConnectionState.OFFLOADING); + if (offloading != null && !offloading.isEmpty()) { + if (offloading.size() == 1) { + throw new OffloadedNodeMutableRequestException("Node " + offloading.iterator().next() + " is currently offloading"); + } else { + throw new OffloadedNodeMutableRequestException(offloading.size() + " Nodes are currently offloading"); + } + } + final List disconnected = stateMap.get(NodeConnectionState.DISCONNECTED); if (disconnected != null && !disconnected.isEmpty()) { if (disconnected.size() == 1) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index b24475e085..e165041775 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -34,6 +34,7 @@ import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionRequest; @@ -49,6 +50,7 @@ import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; @@ -431,7 +433,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn); } - updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis())); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis())); // create the request final ReconnectionRequestMessage request = new ReconnectionRequestMessage(); @@ -469,6 +471,50 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } + @Override + public void finishNodeOffload(final NodeIdentifier nodeId) { + final NodeConnectionState state = getConnectionState(nodeId); + if (state == null) { + logger.warn("Attempted to finish node offload for {} but node is not known.", nodeId); + return; + } + + if (state != NodeConnectionState.OFFLOADING) { + logger.warn("Attempted to finish node offload for {} but node is not in a offload state, it is currently {}.", nodeId, state); + return; + } + + logger.info("{} is now offloaded", nodeId); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + } + + @Override + public void requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) { + final Set offloadNodeIds = getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED); + if (offloadNodeIds.contains(nodeId)) { + logger.debug("Attempted to offload node but the node is already offloading or offloaded"); + // no need to do anything here, the node is currently offloading or already offloaded + return; + } + + final Set disconnectedNodeIds = getNodeIdentifiers(NodeConnectionState.DISCONNECTED); + if (!disconnectedNodeIds.contains(nodeId)) { + throw new IllegalNodeOffloadException("Cannot offload node " + nodeId + " because it is not currently disconnected"); + } + + logger.info("Requesting that {} is offloaded due to {}", nodeId, explanation == null ? offloadCode : explanation); + + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, offloadCode, explanation)); + + final OffloadMessage request = new OffloadMessage(); + request.setNodeId(nodeId); + request.setExplanation(explanation); + + addNodeEvent(nodeId, "Offload requested due to " + explanation); + onNodeOffloaded(nodeId); + offloadAsynchronously(request, 10, 5); + } + @Override public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { final Set connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); @@ -526,17 +572,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl storeState(); } + private void onNodeOffloaded(final NodeIdentifier nodeId) { + eventListeners.forEach(listener -> listener.onNodeOffloaded(nodeId)); + } + private void onNodeRemoved(final NodeIdentifier nodeId) { - eventListeners.stream().forEach(listener -> listener.onNodeRemoved(nodeId)); + eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId)); } private void onNodeAdded(final NodeIdentifier nodeId, final boolean storeState) { if (storeState) { storeState(); } - - - eventListeners.stream().forEach(listener -> listener.onNodeAdded(nodeId)); + eventListeners.forEach(listener -> listener.onNodeAdded(nodeId)); } @Override @@ -821,7 +869,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator. final Set nodesToNotify; if (notifyAllNodes) { - nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); + nodesToNotify = getNodeIdentifiers(); // Do not notify ourselves because we already know about the status update. nodesToNotify.remove(getLocalNodeIdentifier()); @@ -841,6 +889,34 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl senderListener.notifyNodeStatusChange(nodesToNotify, message); } + private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) { + final Thread offloadThread = new Thread(new Runnable() { + @Override + public void run() { + final NodeIdentifier nodeId = request.getNodeId(); + + for (int i = 0; i < attempts; i++) { + try { + senderListener.offload(request); + reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation()); + return; + } catch (final Exception e) { + logger.error("Failed to notify {} that it has been offloaded due to {}", request.getNodeId(), request.getExplanation(), e); + + try { + Thread.sleep(retrySeconds * 1000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + }, "Offload " + request.getNodeId()); + + offloadThread.start(); + } + private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) { final Thread disconnectThread = new Thread(new Runnable() { @Override @@ -961,8 +1037,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl if (oldStatus == null || status.getState() != oldStatus.getState()) { sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString()); - if (status.getDisconnectReason() != null) { - sb.append(" due to ").append(status.getDisconnectReason()); + if (status.getReason() != null) { + sb.append(" due to ").append(status.getReason()); } else if (status.getDisconnectCode() != null) { sb.append(" due to ").append(status.getDisconnectCode().toString()); } @@ -1118,7 +1194,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting."); } - status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis()); + status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis()); updateNodeStatus(status); final ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, getConnectionStatuses(), diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java new file mode 100644 index 0000000000..f1bc6694d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when an offload request is issued to a node that cannot be offloaded (e.g., not currently disconnected). + */ +public class IllegalNodeOffloadException extends IllegalClusterStateException { + + public IllegalNodeOffloadException() { + } + + public IllegalNodeOffloadException(String msg) { + super(msg); + } + + public IllegalNodeOffloadException(Throwable cause) { + super(cause); + } + + public IllegalNodeOffloadException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java new file mode 100644 index 0000000000..3663349735 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a HTTP request that may change a node's dataflow is to be replicated while one or more nodes are offloaded. + * + */ +public class OffloadedNodeMutableRequestException extends MutableRequestException { + + public OffloadedNodeMutableRequestException() { + } + + public OffloadedNodeMutableRequestException(String msg) { + super(msg); + } + + public OffloadedNodeMutableRequestException(Throwable cause) { + super(cause); + } + + public OffloadedNodeMutableRequestException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 6ea019d947..4aeff7b3eb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -20,6 +20,7 @@ package org.apache.nifi.cluster.coordination.heartbeat; import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -244,6 +245,16 @@ public class TestAbstractHeartbeatMonitor { statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } + @Override + public synchronized void finishNodeOffload(NodeIdentifier nodeId) { + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + } + + @Override + public synchronized void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) { + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + } + @Override public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index fb06a15dec..5ce2985c03 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -280,7 +280,7 @@ public class TestNodeClusterCoordinator { assertNotNull(statusChange); assertEquals(createNodeId(1), statusChange.getNodeIdentifier()); assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChange.getDisconnectCode()); - assertEquals("Unit Test", statusChange.getDisconnectReason()); + assertEquals("Unit Test", statusChange.getReason()); } @Test @@ -407,7 +407,7 @@ public class TestNodeClusterCoordinator { nodeStatuses.clear(); final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED, - DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L); + null, DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L); final NodeStatusChangeMessage msg = new NodeStatusChangeMessage(); msg.setNodeId(nodeId1); msg.setNodeConnectionStatus(oldStatus); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 7a5c45e087..297595f4df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -23,6 +23,7 @@ import org.apache.nifi.authorization.ManagedAuthorizer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.cluster.ConnectionException; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -36,6 +37,7 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; @@ -44,12 +46,14 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.lifecycle.LifeCycleStartException; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.NarClassLoaders; @@ -381,6 +385,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { public boolean canHandle(final ProtocolMessage msg) { switch (msg.getType()) { case RECONNECTION_REQUEST: + case OFFLOAD_REQUEST: case DISCONNECTION_REQUEST: case FLOW_REQUEST: return true; @@ -415,6 +420,22 @@ public class StandardFlowService implements FlowService, ProtocolHandler { return new ReconnectionResponseMessage(); } + case OFFLOAD_REQUEST: { + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + handleOffloadRequest((OffloadMessage) request); + } catch (InterruptedException e) { + throw new ProtocolException("Could not complete offload request", e); + } + } + }, "Offload Flow Files from Node"); + t.setDaemon(true); + t.start(); + + return null; + } case DISCONNECTION_REQUEST: { final Thread t = new Thread(new Runnable() { @Override @@ -561,7 +582,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException { readLock.lock(); try { - logger.info("Received flow request message from manager."); + logger.info("Received flow request message from cluster coordinator."); // create the response final FlowResponseMessage response = new FlowResponseMessage(); @@ -631,7 +652,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private void handleReconnectionRequest(final ReconnectionRequestMessage request) { try { - logger.info("Processing reconnection request from manager."); + logger.info("Processing reconnection request from cluster coordinator."); // reconnect ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), @@ -662,8 +683,48 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } + private void handleOffloadRequest(final OffloadMessage request) throws InterruptedException { + logger.info("Received offload request message from cluster coordinator with explanation: " + request.getExplanation()); + offload(request.getExplanation()); + } + + private void offload(final String explanation) throws InterruptedException { + writeLock.lock(); + try { + + logger.info("Offloading node due to " + explanation); + + // mark node as offloading + controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation)); + // request to stop all processors on node + controller.stopAllProcessors(); + // request to stop all remote process groups + controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting); + // terminate all processors + controller.getRootGroup().findAllProcessors() + // filter stream, only stopped processors can be terminated + .stream().filter(pn -> pn.getScheduledState() == ScheduledState.STOPPED) + .forEach(pn -> pn.getProcessGroup().terminateProcessor(pn)); + // offload all queues on node + controller.getAllQueues().forEach(FlowFileQueue::offloadQueue); + // wait for rebalance of flowfiles on all queues + while (controller.getControllerStatus().getQueuedCount() > 0) { + logger.debug("Offloading queues on node {}, remaining queued count: {}", getNodeId(), controller.getControllerStatus().getQueuedCount()); + Thread.sleep(1000); + } + // finish offload + controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation)); + clusterCoordinator.finishNodeOffload(getNodeId()); + + logger.info("Node offloaded due to " + explanation); + + } finally { + writeLock.unlock(); + } + } + private void handleDisconnectionRequest(final DisconnectMessage request) { - logger.info("Received disconnection request message from manager with explanation: " + request.getExplanation()); + logger.info("Received disconnection request message from cluster coordinator with explanation: " + request.getExplanation()); disconnect(request.getExplanation()); } @@ -829,11 +890,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } else if (response.getRejectionReason() != null) { logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason()); - // set response to null and treat a firewall blockage the same as getting no response from manager + // set response to null and treat a firewall blockage the same as getting no response from cluster coordinator response = null; break; } else { - // we received a successful connection response from manager + // we received a successful connection response from cluster coordinator break; } } catch (final NoClusterCoordinatorException ncce) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java index cab41e8a8e..ee222f4f2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -74,6 +74,10 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow public void stopLoadBalancing() { } + @Override + public void offloadQueue() { + } + @Override public boolean isActivelyLoadBalancing() { return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 193a9610e6..4c9188b4ec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -40,6 +40,7 @@ import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner; import org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner; import org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition; +import org.apache.nifi.controller.queue.clustered.partition.NonLocalPartitionPartitioner; import org.apache.nifi.controller.queue.clustered.partition.QueuePartition; import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition; import org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition; @@ -113,6 +114,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple private QueuePartition[] queuePartitions; private FlowFilePartitioner partitioner; private boolean stopped = true; + private boolean offloaded = false; public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo, @@ -204,6 +206,19 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple setFlowFilePartitioner(partitioner); } + @Override + public void offloadQueue() { + if (clusterCoordinator == null) { + // Not clustered, so don't change partitions + return; + } + + offloaded = true; + + // TODO need to be able to reset the partitioner to the previous partitioner if this node is reconnected to the cluster + setFlowFilePartitioner(new NonLocalPartitionPartitioner()); + } + public synchronized void startLoadBalancing() { logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this); @@ -551,6 +566,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return; } + if (offloaded) { + logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the queue has been offloaded", this); + return; + } + logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers); for (final QueuePartition queuePartition : queuePartitions) { queuePartition.stop(); @@ -968,6 +988,20 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } } + @Override + public void onNodeOffloaded(final NodeIdentifier nodeId) { + partitionWriteLock.lock(); + try { + final Set updatedNodeIds = new HashSet<>(nodeIdentifiers); + updatedNodeIds.remove(nodeId); + + logger.debug("Node Identifier {} offloaded. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); + setNodeIdentifiers(updatedNodeIds, false); + } finally { + partitionWriteLock.unlock(); + } + } + @Override public void onNodeRemoved(final NodeIdentifier nodeId) { partitionWriteLock.lock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java new file mode 100644 index 0000000000..cffaefd5cb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Returns remote partitions when queried for a partition; never returns the {@link LocalQueuePartition}. + */ +public class NonLocalPartitionPartitioner implements FlowFilePartitioner { + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + QueuePartition remotePartition = null; + final long startIndex = counter.getAndIncrement(); + for (int i = 0, numPartitions = partitions.length; i < numPartitions && remotePartition == null; ++i) { + int index = (int) ((startIndex + i) % numPartitions); + QueuePartition partition = partitions[index]; + if (!partition.equals(localPartition)) { + remotePartition = partition; + } + } + + if (remotePartition == null) { + throw new IllegalStateException("Could not determine a remote partition"); + } + + return remotePartition; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return false; + } + + + @Override + public boolean isRebalanceOnFailure() { + return true; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 402ce06ad1..878ad130d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -109,6 +109,10 @@ public class TestWriteAheadFlowFileRepository { public void stopLoadBalancing() { } + @Override + public void offloadQueue() { + } + @Override public boolean isActivelyLoadBalancing() { return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index b8ac88a58b..4ef241e8f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -47,6 +47,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -1124,6 +1125,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeConnect(nodeId, userDn); + } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) { + clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED, + "User " + userDn + " requested that node be offloaded"); } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "User " + userDn + " requested that node be disconnected from cluster"); @@ -4702,7 +4706,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier); - if (!nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) { + if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) { throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId + " because it is not disconnected, current state = " + nodeConnectionStatus.getState()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java new file mode 100644 index 0000000000..890a8a6726 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.config; + +import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException; +import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +/** + * Maps illegal node offload exceptions into client responses. + */ +@Provider +public class IllegalNodeOffloadExceptionMapper implements ExceptionMapper { + + private static final Logger logger = LoggerFactory.getLogger(IllegalNodeOffloadExceptionMapper.class); + + @Override + public Response toResponse(IllegalNodeOffloadException exception) { + logger.info(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT)); + + if (logger.isDebugEnabled()) { + logger.debug(StringUtils.EMPTY, exception); + } + return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build(); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json index eff9e33ce8..0bbd0ed00c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json @@ -30,7 +30,7 @@ "d3-selection-multi": "1.0.1", "jquery-minicolors": "2.1.10", "jquery-ui-dist": "1.12.1", - "font-awesome": "4.6.1", + "font-awesome": "4.7.0", "jquery": "3.1.1", "reset.css": "2.0.2", "jquery-form": "3.50.0", diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js index 095f7149f4..0dc74d7cad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js @@ -528,6 +528,8 @@ promptForConnect(item); } else if (target.hasClass('prompt-for-removal')) { promptForRemoval(item); + } else if (target.hasClass('prompt-for-offload')) { + promptForOffload(item); } else if (target.hasClass('prompt-for-disconnect')) { promptForDisconnect(item); } @@ -630,19 +632,29 @@ var actionFormatter = function (row, cell, value, columnDef, dataContext) { var canDisconnect = false; var canConnect = false; + var isOffloaded = false; // determine the current status if (dataContext.status === 'CONNECTED' || dataContext.status === 'CONNECTING') { canDisconnect = true; - } else if (dataContext.status === 'DISCONNECTED') { + } + if (dataContext.status === 'DISCONNECTED') { canConnect = true; } + if (dataContext.status === 'OFFLOADED') { + isOffloaded = true; + } // return the appropriate markup if (canConnect) { - return '
'; + return '
' + + '
' + + '
'; } else if (canDisconnect) { return '
'; + } else if (isOffloaded) { + return '
' + + '
'; } else { return '
 
'; } @@ -946,6 +958,50 @@ }).fail(nfErrorHandler.handleAjaxError); }; + /** + * Prompts to verify node offload. + * + * @argument {object} node The node + */ + var promptForOffload = function (node) { + nfDialog.showYesNoDialog({ + headerText: 'Offload Node', + dialogContent: 'Offload \'' + formatNodeAddress(node) + '\'?', + yesHandler: function () { + offload(node.nodeId); + } + }); + }; + + /** + * Offloads the node in the specified row. + * + * @argument {string} nodeId The node id + */ + var offload = function (nodeId) { + var entity = { + 'node': { + 'nodeId': nodeId, + 'status': 'OFFLOADING' + } + }; + + $.ajax({ + type: 'PUT', + url: config.urls.nodes + '/' + encodeURIComponent(nodeId), + data: JSON.stringify(entity), + dataType: 'json', + contentType: 'application/json' + }).done(function (response) { + var node = response.node; + + // update the node in the table + var clusterGrid = $('#cluster-nodes-table').data('gridInstance'); + var clusterData = clusterGrid.getData(); + clusterData.updateItem(node.nodeId, node); + }).fail(nfErrorHandler.handleAjaxError); + }; + /** * Prompts to verify node disconnection. * diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java index 22821ee384..6cb3226495 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.toolkit.cli.impl.client.nifi; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryClientsEntity; @@ -34,4 +36,16 @@ public interface ControllerClient { RegistryClientEntity updateRegistryClient(RegistryClientEntity registryClientEntity) throws NiFiClientException, IOException; + NodeEntity connectNode(String nodeId, NodeEntity nodeEntity) throws NiFiClientException, IOException; + + NodeEntity deleteNode(String nodeId) throws NiFiClientException, IOException; + + NodeEntity disconnectNode(String nodeId, NodeEntity nodeEntity) throws NiFiClientException, IOException; + + NodeEntity getNode(String nodeId) throws NiFiClientException, IOException; + + ClusterEntity getNodes() throws NiFiClientException, IOException; + + NodeEntity offloadNode(String nodeId, NodeEntity nodeEntity) throws NiFiClientException, IOException; + } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java index 9c9ffc49d3..a162790468 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java @@ -19,6 +19,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi.impl; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryClientsEntity; @@ -104,4 +106,89 @@ public class JerseyControllerClient extends AbstractJerseyClient implements Cont }); } + @Override + public NodeEntity deleteNode(final String nodeId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + return executeAction("Error deleting node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).delete(NodeEntity.class); + }); + } + + @Override + public NodeEntity connectNode(final String nodeId, final NodeEntity nodeEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + if (nodeEntity == null) { + throw new IllegalArgumentException("Node entity cannot be null"); + } + + return executeAction("Error connecting node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).put(Entity.entity(nodeEntity, MediaType.APPLICATION_JSON), NodeEntity.class); + }); + } + + @Override + public NodeEntity offloadNode(final String nodeId, final NodeEntity nodeEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + if (nodeEntity == null) { + throw new IllegalArgumentException("Node entity cannot be null"); + } + + return executeAction("Error offloading node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).put(Entity.entity(nodeEntity, MediaType.APPLICATION_JSON), NodeEntity.class); + }); + } + + @Override + public NodeEntity disconnectNode(final String nodeId, final NodeEntity nodeEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + if (nodeEntity == null) { + throw new IllegalArgumentException("Node entity cannot be null"); + } + + return executeAction("Error disconnecting node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).put(Entity.entity(nodeEntity, MediaType.APPLICATION_JSON), NodeEntity.class); + }); + } + + @Override + public NodeEntity getNode(String nodeId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + return executeAction("Error retrieving node status", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).get(NodeEntity.class); + }); + } + + @Override + public ClusterEntity getNodes() throws NiFiClientException, IOException { + return executeAction("Error retrieving node status", () -> { + final WebTarget target = controllerTarget.path("cluster"); + + return getRequestBuilder(target).get(ClusterEntity.class); + }); + } } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java index ad15036f3b..171e6cf85e 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java @@ -49,6 +49,9 @@ public enum CommandOption { SRC_FLOW_ID("sf", "sourceFlowIdentifier", "A flow identifier from the source registry", true), SRC_FLOW_VERSION("sfv", "sourceFlowVersion", "A version of a flow from the source registry", true), + // NiFi - Nodes + NIFI_NODE_ID("nnid", "nifiNodeId", "The ID of a node in the NiFi cluster", true), + // NiFi - Registries REGISTRY_CLIENT_ID("rcid", "registryClientId", "The id of a registry client", true), REGISTRY_CLIENT_NAME("rcn", "registryClientName", "The name of the registry client", true), diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java index 00a38a222c..298b709fee 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java @@ -21,6 +21,12 @@ import org.apache.nifi.toolkit.cli.impl.command.AbstractCommandGroup; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.ClusterSummary; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.CurrentUser; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetRootId; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.ConnectNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.OffloadNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.DeleteNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.DisconnectNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.GetNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.GetNodes; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGChangeVersion; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEnableControllerServices; @@ -58,7 +64,13 @@ public class NiFiCommandGroup extends AbstractCommandGroup { final List commands = new ArrayList<>(); commands.add(new CurrentUser()); commands.add(new ClusterSummary()); + commands.add(new ConnectNode()); + commands.add(new DeleteNode()); + commands.add(new DisconnectNode()); commands.add(new GetRootId()); + commands.add(new GetNode()); + commands.add(new GetNodes()); + commands.add(new OffloadNode()); commands.add(new ListRegistryClients()); commands.add(new CreateRegistryClient()); commands.add(new UpdateRegistryClient()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java new file mode 100644 index 0000000000..8ec006692b --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for offloading a node of the NiFi cluster. + */ +public class ConnectNode extends AbstractNiFiCommand { + + public ConnectNode() { + super("connect-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Connects a node to the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeDTO nodeDto = new NodeDTO(); + nodeDto.setNodeId(nodeId); + // TODO There are no constants for the CONNECT node statuses + nodeDto.setStatus("CONNECTING"); + NodeEntity nodeEntity = new NodeEntity(); + nodeEntity.setNode(nodeDto); + NodeEntity nodeEntityResult = controllerClient.connectNode(nodeId, nodeEntity); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java new file mode 100644 index 0000000000..280e625fd9 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.OkResult; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for deleting a node from the NiFi cluster. + */ +public class DeleteNode extends AbstractNiFiCommand { + + public DeleteNode() { + super("delete-node", OkResult.class); + } + + @Override + public String getDescription() { + return "Deletes a node from the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public OkResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + controllerClient.deleteNode(nodeId); + return new OkResult(getContext().isInteractive()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java new file mode 100644 index 0000000000..98fa03a115 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for disconnecting a node from the NiFi cluster. + */ +public class DisconnectNode extends AbstractNiFiCommand { + + public DisconnectNode() { + super("disconnect-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Disconnects a node from the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeDTO nodeDto = new NodeDTO(); + nodeDto.setNodeId(nodeId); + // TODO There's no constant for node status in + nodeDto.setStatus("DISCONNECTING"); + NodeEntity nodeEntity = new NodeEntity(); + nodeEntity.setNode(nodeDto); + NodeEntity nodeEntityResult = controllerClient.disconnectNode(nodeId, nodeEntity); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java new file mode 100644 index 0000000000..54687bdb12 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for retrieving the status of the nodes from the NiFi cluster. + */ +public class GetNode extends AbstractNiFiCommand { + + public GetNode() { + super("get-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Retrieves the status for a node in the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeEntity nodeEntityResult = controllerClient.getNode(nodeId); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java new file mode 100644 index 0000000000..368fb4ddab --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodesResult; +import org.apache.nifi.web.api.entity.ClusterEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for retrieving the status of the nodes from the NiFi cluster. + */ +public class GetNodes extends AbstractNiFiCommand { + + public GetNodes() { + super("get-nodes", NodesResult.class); + } + + @Override + public String getDescription() { + return "Retrieves statuses for the nodes of the NiFi cluster."; + } + + @Override + public NodesResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final ControllerClient controllerClient = client.getControllerClient(); + + ClusterEntity clusterEntityResult = controllerClient.getNodes(); + return new NodesResult(getResultType(properties), clusterEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java new file mode 100644 index 0000000000..aa759b1c7d --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for offloading a node of the NiFi cluster. + */ +public class OffloadNode extends AbstractNiFiCommand { + + public OffloadNode() { + super("offload-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Offloads a node of the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeDTO nodeDto = new NodeDTO(); + nodeDto.setNodeId(nodeId); + // TODO There are no constants for the OFFLOAD node statuses + nodeDto.setStatus("OFFLOADING"); + NodeEntity nodeEntity = new NodeEntity(); + nodeEntity.setNode(nodeDto); + NodeEntity nodeEntityResult = controllerClient.offloadNode(nodeId, nodeEntity); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java new file mode 100644 index 0000000000..3e1efdf46e --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.result; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.toolkit.cli.api.ResultType; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.io.PrintStream; + +public class NodeResult extends AbstractWritableResult { + + private final NodeEntity nodeEntity; + + public NodeResult(ResultType resultType, NodeEntity nodeEntity) { + super(resultType); + this.nodeEntity = nodeEntity; + Validate.notNull(nodeEntity); + } + + @Override + public NodeEntity getResult() { + return nodeEntity; + } + + @Override + protected void writeSimpleResult(PrintStream output) throws IOException { + NodeDTO nodeDTO = nodeEntity.getNode(); + output.printf("Node ID: %s\nNode Address: %s\nAPI Port: %s\nNode Status:%s", + nodeDTO.getNodeId(), nodeDTO.getAddress(), nodeDTO.getApiPort(), nodeDTO.getStatus()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java new file mode 100644 index 0000000000..daab27fd82 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.result; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.toolkit.cli.api.ResultType; +import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter; +import org.apache.nifi.toolkit.cli.impl.result.writer.Table; +import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.glassfish.jersey.internal.guava.Lists; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +public class NodesResult extends AbstractWritableResult { + + private final ClusterEntity clusterEntity; + + public NodesResult(ResultType resultType, ClusterEntity clusterEntity) { + super(resultType); + this.clusterEntity = clusterEntity; + Validate.notNull(clusterEntity); + } + + @Override + public ClusterEntity getResult() { + return clusterEntity; + } + + @Override + protected void writeSimpleResult(PrintStream output) throws IOException { + final Table table = new Table.Builder() + .column("#", 3, 3, false) + .column("Node ID", 36, 36, false) + .column("Node Address", 36, 36, true) + .column("API Port", 8, 8, false) + .column("Node Status", 13, 13, false) + .build(); + + List nodes = Lists.newArrayList(clusterEntity.getCluster().getNodes()); + for (int i = 0; i < nodes.size(); ++i) { + NodeDTO nodeDTO = nodes.get(i); + table.addRow(String.valueOf(i), nodeDTO.getNodeId(), nodeDTO.getAddress(), String.valueOf(nodeDTO.getApiPort()), nodeDTO.getStatus()); + } + + final TableWriter tableWriter = new DynamicTableWriter(); + tableWriter.write(table, output); + } +}