From 04d8da8f46c26bd829c4411ea92692589d93278a Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Tue, 18 Sep 2018 17:09:13 -0400 Subject: [PATCH] NIFI-5585 Added capability to offload a node that is disconnected from the cluster. Updated NodeClusterCoordinator to allow idempotent requests to offload a cluster Added capability to connect/delete/disconnect/offload a node from the cluster to the Toolkit CLI Added capability to get the status of nodes from the cluster to the Toolkit CLI Upgraded FontAwesome to 4.7.0 (from 4.6.1) Added icon "fa-upload" for offloading nodes in the cluster table UI --- .../nifi/controller/queue/FlowFileQueue.java | 2 + .../coordination/ClusterCoordinator.java | 18 ++++ .../ClusterTopologyEventListener.java | 2 + .../node/NodeConnectionState.java | 10 ++ .../node/NodeConnectionStatus.java | 45 +++++---- .../coordination/node/OffloadCode.java | 40 ++++++++ .../ClusterCoordinationProtocolSender.java | 9 ++ ...terCoordinationProtocolSenderListener.java | 6 ++ .../protocol/impl/SocketProtocolListener.java | 3 + ...dardClusterCoordinationProtocolSender.java | 26 +++++ .../message/AdaptedNodeConnectionStatus.java | 20 +++- .../message/NodeConnectionStatusAdapter.java | 6 +- .../protocol/jaxb/message/ObjectFactory.java | 5 + .../protocol/message/OffloadMessage.java | 53 +++++++++++ .../protocol/message/ProtocolMessage.java | 1 + .../heartbeat/AbstractHeartbeatMonitor.java | 8 +- .../ThreadPoolRequestReplicator.java | 18 ++++ .../node/NodeClusterCoordinator.java | 94 +++++++++++++++++-- .../IllegalNodeOffloadException.java | 38 ++++++++ .../OffloadedNodeMutableRequestException.java | 39 ++++++++ .../TestAbstractHeartbeatMonitor.java | 11 +++ .../node/TestNodeClusterCoordinator.java | 4 +- .../nifi/controller/StandardFlowService.java | 71 +++++++++++++- .../queue/StandardFlowFileQueue.java | 4 + .../SocketLoadBalancedFlowFileQueue.java | 34 +++++++ .../NonLocalPartitionPartitioner.java | 58 ++++++++++++ .../TestWriteAheadFlowFileRepository.java | 4 + .../nifi/web/StandardNiFiServiceFacade.java | 6 +- .../IllegalNodeOffloadExceptionMapper.java | 46 +++++++++ .../src/main/frontend/package.json | 2 +- .../webapp/js/nf/cluster/nf-cluster-table.js | 60 +++++++++++- .../impl/client/nifi/ControllerClient.java | 14 +++ .../nifi/impl/JerseyControllerClient.java | 87 +++++++++++++++++ .../cli/impl/command/CommandOption.java | 3 + .../impl/command/nifi/NiFiCommandGroup.java | 12 +++ .../impl/command/nifi/nodes/ConnectNode.java | 67 +++++++++++++ .../impl/command/nifi/nodes/DeleteNode.java | 58 ++++++++++++ .../command/nifi/nodes/DisconnectNode.java | 67 +++++++++++++ .../cli/impl/command/nifi/nodes/GetNode.java | 59 ++++++++++++ .../cli/impl/command/nifi/nodes/GetNodes.java | 52 ++++++++++ .../impl/command/nifi/nodes/OffloadNode.java | 67 +++++++++++++ .../toolkit/cli/impl/result/NodeResult.java | 48 ++++++++++ .../toolkit/cli/impl/result/NodesResult.java | 66 +++++++++++++ 43 files changed, 1299 insertions(+), 44 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/OffloadMessage.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 2c7f55b5ae..7cd0e30393 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -267,6 +267,8 @@ public interface FlowFileQueue { void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String partitioningAttribute); + void offloadQueue(); + LoadBalanceStrategy getLoadBalanceStrategy(); void setLoadBalanceCompression(LoadBalanceCompression compression); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 11786c27c0..2ad0e70971 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.coordination; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -61,6 +62,23 @@ public interface ClusterCoordinator { */ void finishNodeConnection(NodeIdentifier nodeId); + /** + * Indicates that the node has finished being offloaded + * + * @param nodeId the identifier of the node + */ + void finishNodeOffload(NodeIdentifier nodeId); + + /** + * Sends a request to the node to be offloaded. + * The node will be marked as offloading immediately. + * + * @param nodeId the identifier of the node + * @param offloadCode the code that represents why this node is being asked to be offloaded + * @param explanation an explanation as to why the node is being asked to be offloaded + */ + void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation); + /** * Sends a request to the node to disconnect from the cluster. * The node will be marked as disconnected immediately. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java index 54cc4de117..ad9be3d738 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java @@ -23,6 +23,8 @@ public interface ClusterTopologyEventListener { void onNodeAdded(NodeIdentifier nodeId); + void onNodeOffloaded(NodeIdentifier nodeId); + void onNodeRemoved(NodeIdentifier nodeId); void onLocalNodeIdentifierSet(NodeIdentifier localNodeId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java index 8d5824f171..d79552c8cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java @@ -36,12 +36,22 @@ public enum NodeConnectionState { */ CONNECTED, + /** + * A node that is in the process of offloading its flow files from the node. + */ + OFFLOADING, + /** * A node that is in the process of disconnecting from the cluster. * A DISCONNECTING node will always transition to DISCONNECTED. */ DISCONNECTING, + /** + * A node that has offloaded its flow files from the node. + */ + OFFLOADED, + /** * A node that is not connected to the cluster. * A DISCONNECTED node can transition to CONNECTING. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java index 34bd1279e3..7d8a94049c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java @@ -35,47 +35,53 @@ public class NodeConnectionStatus { private final long updateId; private final NodeIdentifier nodeId; private final NodeConnectionState state; + private final OffloadCode offloadCode; private final DisconnectionCode disconnectCode; - private final String disconnectReason; + private final String reason; private final Long connectionRequestTime; public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state) { - this(nodeId, state, null, null, null); + this(nodeId, state, null, null, null, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) { - this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.toString(), null); + this(nodeId, NodeConnectionState.DISCONNECTED, null, disconnectionCode, disconnectionCode.toString(), null); + } + + public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final OffloadCode offloadCode, final String offloadExplanation) { + this(nodeId, state, offloadCode, null, offloadExplanation, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String disconnectionExplanation) { - this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null); + this(nodeId, NodeConnectionState.DISCONNECTED, null, disconnectionCode, disconnectionExplanation, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectionCode) { - this(nodeId, state, disconnectionCode, disconnectionCode == null ? null : disconnectionCode.toString(), null); + this(nodeId, state, null, disconnectionCode, disconnectionCode == null ? null : disconnectionCode.toString(), null); } public NodeConnectionStatus(final NodeConnectionStatus status) { - this(status.getNodeIdentifier(), status.getState(), status.getDisconnectCode(), status.getDisconnectReason(), status.getConnectionRequestTime()); + this(status.getNodeIdentifier(), status.getState(), status.getOffloadCode(), status.getDisconnectCode(), status.getReason(), status.getConnectionRequestTime()); } - public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode, - final String disconnectReason, final Long connectionRequestTime) { - this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, disconnectReason, connectionRequestTime); + public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final OffloadCode offloadCode, + final DisconnectionCode disconnectCode, final String reason, final Long connectionRequestTime) { + this(idGenerator.getAndIncrement(), nodeId, state, offloadCode, disconnectCode, reason, connectionRequestTime); } - public NodeConnectionStatus(final long updateId, final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode, - final String disconnectReason, final Long connectionRequestTime) { + public NodeConnectionStatus(final long updateId, final NodeIdentifier nodeId, final NodeConnectionState state, final OffloadCode offloadCode, + final DisconnectionCode disconnectCode, final String reason, final Long connectionRequestTime) { this.updateId = updateId; this.nodeId = nodeId; this.state = state; + this.offloadCode = offloadCode; if (state == NodeConnectionState.DISCONNECTED && disconnectCode == null) { this.disconnectCode = DisconnectionCode.UNKNOWN; - this.disconnectReason = this.disconnectCode.toString(); + this.reason = this.disconnectCode.toString(); } else { this.disconnectCode = disconnectCode; - this.disconnectReason = disconnectReason; + this.reason = reason; } this.connectionRequestTime = (connectionRequestTime == null && state == NodeConnectionState.CONNECTING) ? Long.valueOf(System.currentTimeMillis()) : connectionRequestTime; @@ -93,12 +99,16 @@ public class NodeConnectionStatus { return state; } + public OffloadCode getOffloadCode() { + return offloadCode; + } + public DisconnectionCode getDisconnectCode() { return disconnectCode; } - public String getDisconnectReason() { - return disconnectReason; + public String getReason() { + return reason; } public Long getConnectionRequestTime() { @@ -110,8 +120,11 @@ public class NodeConnectionStatus { final StringBuilder sb = new StringBuilder(); final NodeConnectionState state = getState(); sb.append("NodeConnectionStatus[nodeId=").append(nodeId).append(", state=").append(state); + if (state == NodeConnectionState.OFFLOADED || state == NodeConnectionState.OFFLOADING) { + sb.append(", Offload Code=").append(getOffloadCode()).append(", Offload Reason=").append(getReason()); + } if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) { - sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason()); + sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getReason()); } sb.append(", updateId=").append(getUpdateIdentifier()); sb.append("]"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java new file mode 100644 index 0000000000..fb4d30bbc5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.node; + +/** + * An enumeration of the reasons that a node may be offloaded + */ +public enum OffloadCode { + + /** + * A user explicitly offloaded the node + */ + OFFLOADED("Node Offloaded"); + + private final String description; + + OffloadCode(final String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index 986231efd4..b5485ccd56 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol; import java.util.Set; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -40,6 +41,14 @@ public interface ClusterCoordinationProtocolSender { */ ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException; + /** + * Sends an "offload request" message to a node. + * + * @param msg a message + * @throws ProtocolException if communication failed + */ + void offload(OffloadMessage msg) throws ProtocolException; + /** * Sends a "disconnection request" message to a node. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java index ae3a0e5057..74cc6b476a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java @@ -26,6 +26,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -100,6 +101,11 @@ public class ClusterCoordinationProtocolSenderListener implements ClusterCoordin return sender.requestReconnection(msg); } + @Override + public void offload(OffloadMessage msg) throws ProtocolException { + sender.offload(msg); + } + @Override public void disconnect(DisconnectMessage msg) throws ProtocolException { sender.disconnect(msg); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index 9eaffd37b9..c588a6807d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -24,6 +24,7 @@ import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -210,6 +211,8 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi return ((ConnectionRequestMessage) message).getConnectionRequest().getProposedNodeIdentifier(); case HEARTBEAT: return ((HeartbeatMessage) message).getHeartbeat().getNodeIdentifier(); + case OFFLOAD_REQUEST: + return ((OffloadMessage) message).getNodeId(); case DISCONNECTION_REQUEST: return ((DisconnectMessage) message).getNodeId(); case FLOW_REQUEST: diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index 167ddec932..b21068ffe5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -36,6 +36,7 @@ import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; @@ -128,6 +129,31 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } } + /** + * Requests a node to be offloaded. The configured value for + * handshake timeout is applied to the socket before making the request. + * + * @param msg a message + * @throws ProtocolException if the message failed to be sent + */ + @Override + public void offload(final OffloadMessage msg) throws ProtocolException { + Socket socket = null; + try { + socket = createSocket(msg.getNodeId(), true); + + // marshal message to output stream + try { + final ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch (final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + /** * Requests a node to disconnect from the cluster. The configured value for * handshake timeout is applied to the socket before making the request. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java index c8c4acf646..5eae83e0e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.protocol.jaxb.message; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -25,8 +26,9 @@ public class AdaptedNodeConnectionStatus { private Long updateId; private NodeIdentifier nodeId; private NodeConnectionState state; + private OffloadCode offloadCode; private DisconnectionCode disconnectCode; - private String disconnectReason; + private String reason; private Long connectionRequestTime; public Long getUpdateId() { @@ -53,20 +55,28 @@ public class AdaptedNodeConnectionStatus { this.state = state; } + public OffloadCode getOffloadCode() { + return offloadCode; + } + public DisconnectionCode getDisconnectCode() { return disconnectCode; } + public void setOffloadCode(OffloadCode offloadCode) { + this.offloadCode = offloadCode; + } + public void setDisconnectCode(DisconnectionCode disconnectCode) { this.disconnectCode = disconnectCode; } - public String getDisconnectReason() { - return disconnectReason; + public String getReason() { + return reason; } - public void setDisconnectReason(String disconnectReason) { - this.disconnectReason = disconnectReason; + public void setReason(String reason) { + this.reason = reason; } public Long getConnectionRequestTime() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java index ec209de1f5..47e92e8d2a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java @@ -28,8 +28,9 @@ public class NodeConnectionStatusAdapter extends XmlAdapter offloaded = stateMap.get(NodeConnectionState.OFFLOADED); + if (offloaded != null && !offloaded.isEmpty()) { + if (offloaded.size() == 1) { + throw new OffloadedNodeMutableRequestException("Node " + offloaded.iterator().next() + " is currently offloaded"); + } else { + throw new OffloadedNodeMutableRequestException(offloaded.size() + " Nodes are currently offloaded"); + } + } + + final List offloading = stateMap.get(NodeConnectionState.OFFLOADING); + if (offloading != null && !offloading.isEmpty()) { + if (offloading.size() == 1) { + throw new OffloadedNodeMutableRequestException("Node " + offloading.iterator().next() + " is currently offloading"); + } else { + throw new OffloadedNodeMutableRequestException(offloading.size() + " Nodes are currently offloading"); + } + } + final List disconnected = stateMap.get(NodeConnectionState.DISCONNECTED); if (disconnected != null && !disconnected.isEmpty()) { if (disconnected.size() == 1) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index b24475e085..e165041775 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -34,6 +34,7 @@ import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionRequest; @@ -49,6 +50,7 @@ import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; @@ -431,7 +433,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn); } - updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis())); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis())); // create the request final ReconnectionRequestMessage request = new ReconnectionRequestMessage(); @@ -469,6 +471,50 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } + @Override + public void finishNodeOffload(final NodeIdentifier nodeId) { + final NodeConnectionState state = getConnectionState(nodeId); + if (state == null) { + logger.warn("Attempted to finish node offload for {} but node is not known.", nodeId); + return; + } + + if (state != NodeConnectionState.OFFLOADING) { + logger.warn("Attempted to finish node offload for {} but node is not in a offload state, it is currently {}.", nodeId, state); + return; + } + + logger.info("{} is now offloaded", nodeId); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + } + + @Override + public void requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) { + final Set offloadNodeIds = getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED); + if (offloadNodeIds.contains(nodeId)) { + logger.debug("Attempted to offload node but the node is already offloading or offloaded"); + // no need to do anything here, the node is currently offloading or already offloaded + return; + } + + final Set disconnectedNodeIds = getNodeIdentifiers(NodeConnectionState.DISCONNECTED); + if (!disconnectedNodeIds.contains(nodeId)) { + throw new IllegalNodeOffloadException("Cannot offload node " + nodeId + " because it is not currently disconnected"); + } + + logger.info("Requesting that {} is offloaded due to {}", nodeId, explanation == null ? offloadCode : explanation); + + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, offloadCode, explanation)); + + final OffloadMessage request = new OffloadMessage(); + request.setNodeId(nodeId); + request.setExplanation(explanation); + + addNodeEvent(nodeId, "Offload requested due to " + explanation); + onNodeOffloaded(nodeId); + offloadAsynchronously(request, 10, 5); + } + @Override public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { final Set connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); @@ -526,17 +572,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl storeState(); } + private void onNodeOffloaded(final NodeIdentifier nodeId) { + eventListeners.forEach(listener -> listener.onNodeOffloaded(nodeId)); + } + private void onNodeRemoved(final NodeIdentifier nodeId) { - eventListeners.stream().forEach(listener -> listener.onNodeRemoved(nodeId)); + eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId)); } private void onNodeAdded(final NodeIdentifier nodeId, final boolean storeState) { if (storeState) { storeState(); } - - - eventListeners.stream().forEach(listener -> listener.onNodeAdded(nodeId)); + eventListeners.forEach(listener -> listener.onNodeAdded(nodeId)); } @Override @@ -821,7 +869,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator. final Set nodesToNotify; if (notifyAllNodes) { - nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); + nodesToNotify = getNodeIdentifiers(); // Do not notify ourselves because we already know about the status update. nodesToNotify.remove(getLocalNodeIdentifier()); @@ -841,6 +889,34 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl senderListener.notifyNodeStatusChange(nodesToNotify, message); } + private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) { + final Thread offloadThread = new Thread(new Runnable() { + @Override + public void run() { + final NodeIdentifier nodeId = request.getNodeId(); + + for (int i = 0; i < attempts; i++) { + try { + senderListener.offload(request); + reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation()); + return; + } catch (final Exception e) { + logger.error("Failed to notify {} that it has been offloaded due to {}", request.getNodeId(), request.getExplanation(), e); + + try { + Thread.sleep(retrySeconds * 1000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + }, "Offload " + request.getNodeId()); + + offloadThread.start(); + } + private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) { final Thread disconnectThread = new Thread(new Runnable() { @Override @@ -961,8 +1037,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl if (oldStatus == null || status.getState() != oldStatus.getState()) { sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString()); - if (status.getDisconnectReason() != null) { - sb.append(" due to ").append(status.getDisconnectReason()); + if (status.getReason() != null) { + sb.append(" due to ").append(status.getReason()); } else if (status.getDisconnectCode() != null) { sb.append(" due to ").append(status.getDisconnectCode().toString()); } @@ -1118,7 +1194,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting."); } - status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis()); + status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis()); updateNodeStatus(status); final ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, getConnectionStatuses(), diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java new file mode 100644 index 0000000000..f1bc6694d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when an offload request is issued to a node that cannot be offloaded (e.g., not currently disconnected). + */ +public class IllegalNodeOffloadException extends IllegalClusterStateException { + + public IllegalNodeOffloadException() { + } + + public IllegalNodeOffloadException(String msg) { + super(msg); + } + + public IllegalNodeOffloadException(Throwable cause) { + super(cause); + } + + public IllegalNodeOffloadException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java new file mode 100644 index 0000000000..3663349735 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a HTTP request that may change a node's dataflow is to be replicated while one or more nodes are offloaded. + * + */ +public class OffloadedNodeMutableRequestException extends MutableRequestException { + + public OffloadedNodeMutableRequestException() { + } + + public OffloadedNodeMutableRequestException(String msg) { + super(msg); + } + + public OffloadedNodeMutableRequestException(Throwable cause) { + super(cause); + } + + public OffloadedNodeMutableRequestException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 6ea019d947..4aeff7b3eb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -20,6 +20,7 @@ package org.apache.nifi.cluster.coordination.heartbeat; import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -244,6 +245,16 @@ public class TestAbstractHeartbeatMonitor { statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } + @Override + public synchronized void finishNodeOffload(NodeIdentifier nodeId) { + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + } + + @Override + public synchronized void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) { + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + } + @Override public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index fb06a15dec..5ce2985c03 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -280,7 +280,7 @@ public class TestNodeClusterCoordinator { assertNotNull(statusChange); assertEquals(createNodeId(1), statusChange.getNodeIdentifier()); assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChange.getDisconnectCode()); - assertEquals("Unit Test", statusChange.getDisconnectReason()); + assertEquals("Unit Test", statusChange.getReason()); } @Test @@ -407,7 +407,7 @@ public class TestNodeClusterCoordinator { nodeStatuses.clear(); final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED, - DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L); + null, DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L); final NodeStatusChangeMessage msg = new NodeStatusChangeMessage(); msg.setNodeId(nodeId1); msg.setNodeConnectionStatus(oldStatus); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 7a5c45e087..297595f4df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -23,6 +23,7 @@ import org.apache.nifi.authorization.ManagedAuthorizer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.cluster.ConnectionException; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -36,6 +37,7 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; @@ -44,12 +46,14 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.lifecycle.LifeCycleStartException; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.NarClassLoaders; @@ -381,6 +385,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { public boolean canHandle(final ProtocolMessage msg) { switch (msg.getType()) { case RECONNECTION_REQUEST: + case OFFLOAD_REQUEST: case DISCONNECTION_REQUEST: case FLOW_REQUEST: return true; @@ -415,6 +420,22 @@ public class StandardFlowService implements FlowService, ProtocolHandler { return new ReconnectionResponseMessage(); } + case OFFLOAD_REQUEST: { + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + handleOffloadRequest((OffloadMessage) request); + } catch (InterruptedException e) { + throw new ProtocolException("Could not complete offload request", e); + } + } + }, "Offload Flow Files from Node"); + t.setDaemon(true); + t.start(); + + return null; + } case DISCONNECTION_REQUEST: { final Thread t = new Thread(new Runnable() { @Override @@ -561,7 +582,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException { readLock.lock(); try { - logger.info("Received flow request message from manager."); + logger.info("Received flow request message from cluster coordinator."); // create the response final FlowResponseMessage response = new FlowResponseMessage(); @@ -631,7 +652,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private void handleReconnectionRequest(final ReconnectionRequestMessage request) { try { - logger.info("Processing reconnection request from manager."); + logger.info("Processing reconnection request from cluster coordinator."); // reconnect ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), @@ -662,8 +683,48 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } + private void handleOffloadRequest(final OffloadMessage request) throws InterruptedException { + logger.info("Received offload request message from cluster coordinator with explanation: " + request.getExplanation()); + offload(request.getExplanation()); + } + + private void offload(final String explanation) throws InterruptedException { + writeLock.lock(); + try { + + logger.info("Offloading node due to " + explanation); + + // mark node as offloading + controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation)); + // request to stop all processors on node + controller.stopAllProcessors(); + // request to stop all remote process groups + controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting); + // terminate all processors + controller.getRootGroup().findAllProcessors() + // filter stream, only stopped processors can be terminated + .stream().filter(pn -> pn.getScheduledState() == ScheduledState.STOPPED) + .forEach(pn -> pn.getProcessGroup().terminateProcessor(pn)); + // offload all queues on node + controller.getAllQueues().forEach(FlowFileQueue::offloadQueue); + // wait for rebalance of flowfiles on all queues + while (controller.getControllerStatus().getQueuedCount() > 0) { + logger.debug("Offloading queues on node {}, remaining queued count: {}", getNodeId(), controller.getControllerStatus().getQueuedCount()); + Thread.sleep(1000); + } + // finish offload + controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation)); + clusterCoordinator.finishNodeOffload(getNodeId()); + + logger.info("Node offloaded due to " + explanation); + + } finally { + writeLock.unlock(); + } + } + private void handleDisconnectionRequest(final DisconnectMessage request) { - logger.info("Received disconnection request message from manager with explanation: " + request.getExplanation()); + logger.info("Received disconnection request message from cluster coordinator with explanation: " + request.getExplanation()); disconnect(request.getExplanation()); } @@ -829,11 +890,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } else if (response.getRejectionReason() != null) { logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason()); - // set response to null and treat a firewall blockage the same as getting no response from manager + // set response to null and treat a firewall blockage the same as getting no response from cluster coordinator response = null; break; } else { - // we received a successful connection response from manager + // we received a successful connection response from cluster coordinator break; } } catch (final NoClusterCoordinatorException ncce) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java index cab41e8a8e..ee222f4f2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -74,6 +74,10 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow public void stopLoadBalancing() { } + @Override + public void offloadQueue() { + } + @Override public boolean isActivelyLoadBalancing() { return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 193a9610e6..4c9188b4ec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -40,6 +40,7 @@ import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner; import org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner; import org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition; +import org.apache.nifi.controller.queue.clustered.partition.NonLocalPartitionPartitioner; import org.apache.nifi.controller.queue.clustered.partition.QueuePartition; import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition; import org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition; @@ -113,6 +114,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple private QueuePartition[] queuePartitions; private FlowFilePartitioner partitioner; private boolean stopped = true; + private boolean offloaded = false; public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo, @@ -204,6 +206,19 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple setFlowFilePartitioner(partitioner); } + @Override + public void offloadQueue() { + if (clusterCoordinator == null) { + // Not clustered, so don't change partitions + return; + } + + offloaded = true; + + // TODO need to be able to reset the partitioner to the previous partitioner if this node is reconnected to the cluster + setFlowFilePartitioner(new NonLocalPartitionPartitioner()); + } + public synchronized void startLoadBalancing() { logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this); @@ -551,6 +566,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return; } + if (offloaded) { + logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the queue has been offloaded", this); + return; + } + logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers); for (final QueuePartition queuePartition : queuePartitions) { queuePartition.stop(); @@ -968,6 +988,20 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } } + @Override + public void onNodeOffloaded(final NodeIdentifier nodeId) { + partitionWriteLock.lock(); + try { + final Set updatedNodeIds = new HashSet<>(nodeIdentifiers); + updatedNodeIds.remove(nodeId); + + logger.debug("Node Identifier {} offloaded. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); + setNodeIdentifiers(updatedNodeIds, false); + } finally { + partitionWriteLock.unlock(); + } + } + @Override public void onNodeRemoved(final NodeIdentifier nodeId) { partitionWriteLock.lock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java new file mode 100644 index 0000000000..cffaefd5cb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Returns remote partitions when queried for a partition; never returns the {@link LocalQueuePartition}. + */ +public class NonLocalPartitionPartitioner implements FlowFilePartitioner { + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + QueuePartition remotePartition = null; + final long startIndex = counter.getAndIncrement(); + for (int i = 0, numPartitions = partitions.length; i < numPartitions && remotePartition == null; ++i) { + int index = (int) ((startIndex + i) % numPartitions); + QueuePartition partition = partitions[index]; + if (!partition.equals(localPartition)) { + remotePartition = partition; + } + } + + if (remotePartition == null) { + throw new IllegalStateException("Could not determine a remote partition"); + } + + return remotePartition; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return false; + } + + + @Override + public boolean isRebalanceOnFailure() { + return true; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 402ce06ad1..878ad130d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -109,6 +109,10 @@ public class TestWriteAheadFlowFileRepository { public void stopLoadBalancing() { } + @Override + public void offloadQueue() { + } + @Override public boolean isActivelyLoadBalancing() { return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index b8ac88a58b..4ef241e8f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -47,6 +47,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -1124,6 +1125,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeConnect(nodeId, userDn); + } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) { + clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED, + "User " + userDn + " requested that node be offloaded"); } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "User " + userDn + " requested that node be disconnected from cluster"); @@ -4702,7 +4706,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier); - if (!nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) { + if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) { throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId + " because it is not disconnected, current state = " + nodeConnectionStatus.getState()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java new file mode 100644 index 0000000000..890a8a6726 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.config; + +import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException; +import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +/** + * Maps illegal node offload exceptions into client responses. + */ +@Provider +public class IllegalNodeOffloadExceptionMapper implements ExceptionMapper { + + private static final Logger logger = LoggerFactory.getLogger(IllegalNodeOffloadExceptionMapper.class); + + @Override + public Response toResponse(IllegalNodeOffloadException exception) { + logger.info(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT)); + + if (logger.isDebugEnabled()) { + logger.debug(StringUtils.EMPTY, exception); + } + return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build(); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json index eff9e33ce8..0bbd0ed00c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json @@ -30,7 +30,7 @@ "d3-selection-multi": "1.0.1", "jquery-minicolors": "2.1.10", "jquery-ui-dist": "1.12.1", - "font-awesome": "4.6.1", + "font-awesome": "4.7.0", "jquery": "3.1.1", "reset.css": "2.0.2", "jquery-form": "3.50.0", diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js index 095f7149f4..0dc74d7cad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js @@ -528,6 +528,8 @@ promptForConnect(item); } else if (target.hasClass('prompt-for-removal')) { promptForRemoval(item); + } else if (target.hasClass('prompt-for-offload')) { + promptForOffload(item); } else if (target.hasClass('prompt-for-disconnect')) { promptForDisconnect(item); } @@ -630,19 +632,29 @@ var actionFormatter = function (row, cell, value, columnDef, dataContext) { var canDisconnect = false; var canConnect = false; + var isOffloaded = false; // determine the current status if (dataContext.status === 'CONNECTED' || dataContext.status === 'CONNECTING') { canDisconnect = true; - } else if (dataContext.status === 'DISCONNECTED') { + } + if (dataContext.status === 'DISCONNECTED') { canConnect = true; } + if (dataContext.status === 'OFFLOADED') { + isOffloaded = true; + } // return the appropriate markup if (canConnect) { - return '
'; + return '
' + + '
' + + '
'; } else if (canDisconnect) { return '
'; + } else if (isOffloaded) { + return '
' + + '
'; } else { return '
 
'; } @@ -946,6 +958,50 @@ }).fail(nfErrorHandler.handleAjaxError); }; + /** + * Prompts to verify node offload. + * + * @argument {object} node The node + */ + var promptForOffload = function (node) { + nfDialog.showYesNoDialog({ + headerText: 'Offload Node', + dialogContent: 'Offload \'' + formatNodeAddress(node) + '\'?', + yesHandler: function () { + offload(node.nodeId); + } + }); + }; + + /** + * Offloads the node in the specified row. + * + * @argument {string} nodeId The node id + */ + var offload = function (nodeId) { + var entity = { + 'node': { + 'nodeId': nodeId, + 'status': 'OFFLOADING' + } + }; + + $.ajax({ + type: 'PUT', + url: config.urls.nodes + '/' + encodeURIComponent(nodeId), + data: JSON.stringify(entity), + dataType: 'json', + contentType: 'application/json' + }).done(function (response) { + var node = response.node; + + // update the node in the table + var clusterGrid = $('#cluster-nodes-table').data('gridInstance'); + var clusterData = clusterGrid.getData(); + clusterData.updateItem(node.nodeId, node); + }).fail(nfErrorHandler.handleAjaxError); + }; + /** * Prompts to verify node disconnection. * diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java index 22821ee384..6cb3226495 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.toolkit.cli.impl.client.nifi; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryClientsEntity; @@ -34,4 +36,16 @@ public interface ControllerClient { RegistryClientEntity updateRegistryClient(RegistryClientEntity registryClientEntity) throws NiFiClientException, IOException; + NodeEntity connectNode(String nodeId, NodeEntity nodeEntity) throws NiFiClientException, IOException; + + NodeEntity deleteNode(String nodeId) throws NiFiClientException, IOException; + + NodeEntity disconnectNode(String nodeId, NodeEntity nodeEntity) throws NiFiClientException, IOException; + + NodeEntity getNode(String nodeId) throws NiFiClientException, IOException; + + ClusterEntity getNodes() throws NiFiClientException, IOException; + + NodeEntity offloadNode(String nodeId, NodeEntity nodeEntity) throws NiFiClientException, IOException; + } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java index 9c9ffc49d3..a162790468 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java @@ -19,6 +19,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi.impl; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryClientsEntity; @@ -104,4 +106,89 @@ public class JerseyControllerClient extends AbstractJerseyClient implements Cont }); } + @Override + public NodeEntity deleteNode(final String nodeId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + return executeAction("Error deleting node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).delete(NodeEntity.class); + }); + } + + @Override + public NodeEntity connectNode(final String nodeId, final NodeEntity nodeEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + if (nodeEntity == null) { + throw new IllegalArgumentException("Node entity cannot be null"); + } + + return executeAction("Error connecting node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).put(Entity.entity(nodeEntity, MediaType.APPLICATION_JSON), NodeEntity.class); + }); + } + + @Override + public NodeEntity offloadNode(final String nodeId, final NodeEntity nodeEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + if (nodeEntity == null) { + throw new IllegalArgumentException("Node entity cannot be null"); + } + + return executeAction("Error offloading node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).put(Entity.entity(nodeEntity, MediaType.APPLICATION_JSON), NodeEntity.class); + }); + } + + @Override + public NodeEntity disconnectNode(final String nodeId, final NodeEntity nodeEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + if (nodeEntity == null) { + throw new IllegalArgumentException("Node entity cannot be null"); + } + + return executeAction("Error disconnecting node", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).put(Entity.entity(nodeEntity, MediaType.APPLICATION_JSON), NodeEntity.class); + }); + } + + @Override + public NodeEntity getNode(String nodeId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(nodeId)) { + throw new IllegalArgumentException("Node ID cannot be null or empty"); + } + + return executeAction("Error retrieving node status", () -> { + final WebTarget target = controllerTarget.path("cluster/nodes/" + nodeId); + + return getRequestBuilder(target).get(NodeEntity.class); + }); + } + + @Override + public ClusterEntity getNodes() throws NiFiClientException, IOException { + return executeAction("Error retrieving node status", () -> { + final WebTarget target = controllerTarget.path("cluster"); + + return getRequestBuilder(target).get(ClusterEntity.class); + }); + } } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java index ad15036f3b..171e6cf85e 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java @@ -49,6 +49,9 @@ public enum CommandOption { SRC_FLOW_ID("sf", "sourceFlowIdentifier", "A flow identifier from the source registry", true), SRC_FLOW_VERSION("sfv", "sourceFlowVersion", "A version of a flow from the source registry", true), + // NiFi - Nodes + NIFI_NODE_ID("nnid", "nifiNodeId", "The ID of a node in the NiFi cluster", true), + // NiFi - Registries REGISTRY_CLIENT_ID("rcid", "registryClientId", "The id of a registry client", true), REGISTRY_CLIENT_NAME("rcn", "registryClientName", "The name of the registry client", true), diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java index 00a38a222c..298b709fee 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java @@ -21,6 +21,12 @@ import org.apache.nifi.toolkit.cli.impl.command.AbstractCommandGroup; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.ClusterSummary; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.CurrentUser; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetRootId; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.ConnectNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.OffloadNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.DeleteNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.DisconnectNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.GetNode; +import org.apache.nifi.toolkit.cli.impl.command.nifi.nodes.GetNodes; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGChangeVersion; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEnableControllerServices; @@ -58,7 +64,13 @@ public class NiFiCommandGroup extends AbstractCommandGroup { final List commands = new ArrayList<>(); commands.add(new CurrentUser()); commands.add(new ClusterSummary()); + commands.add(new ConnectNode()); + commands.add(new DeleteNode()); + commands.add(new DisconnectNode()); commands.add(new GetRootId()); + commands.add(new GetNode()); + commands.add(new GetNodes()); + commands.add(new OffloadNode()); commands.add(new ListRegistryClients()); commands.add(new CreateRegistryClient()); commands.add(new UpdateRegistryClient()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java new file mode 100644 index 0000000000..8ec006692b --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/ConnectNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for offloading a node of the NiFi cluster. + */ +public class ConnectNode extends AbstractNiFiCommand { + + public ConnectNode() { + super("connect-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Connects a node to the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeDTO nodeDto = new NodeDTO(); + nodeDto.setNodeId(nodeId); + // TODO There are no constants for the CONNECT node statuses + nodeDto.setStatus("CONNECTING"); + NodeEntity nodeEntity = new NodeEntity(); + nodeEntity.setNode(nodeDto); + NodeEntity nodeEntityResult = controllerClient.connectNode(nodeId, nodeEntity); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java new file mode 100644 index 0000000000..280e625fd9 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DeleteNode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.OkResult; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for deleting a node from the NiFi cluster. + */ +public class DeleteNode extends AbstractNiFiCommand { + + public DeleteNode() { + super("delete-node", OkResult.class); + } + + @Override + public String getDescription() { + return "Deletes a node from the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public OkResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + controllerClient.deleteNode(nodeId); + return new OkResult(getContext().isInteractive()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java new file mode 100644 index 0000000000..98fa03a115 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for disconnecting a node from the NiFi cluster. + */ +public class DisconnectNode extends AbstractNiFiCommand { + + public DisconnectNode() { + super("disconnect-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Disconnects a node from the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeDTO nodeDto = new NodeDTO(); + nodeDto.setNodeId(nodeId); + // TODO There's no constant for node status in + nodeDto.setStatus("DISCONNECTING"); + NodeEntity nodeEntity = new NodeEntity(); + nodeEntity.setNode(nodeDto); + NodeEntity nodeEntityResult = controllerClient.disconnectNode(nodeId, nodeEntity); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java new file mode 100644 index 0000000000..54687bdb12 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNode.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for retrieving the status of the nodes from the NiFi cluster. + */ +public class GetNode extends AbstractNiFiCommand { + + public GetNode() { + super("get-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Retrieves the status for a node in the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeEntity nodeEntityResult = controllerClient.getNode(nodeId); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java new file mode 100644 index 0000000000..368fb4ddab --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/GetNodes.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodesResult; +import org.apache.nifi.web.api.entity.ClusterEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for retrieving the status of the nodes from the NiFi cluster. + */ +public class GetNodes extends AbstractNiFiCommand { + + public GetNodes() { + super("get-nodes", NodesResult.class); + } + + @Override + public String getDescription() { + return "Retrieves statuses for the nodes of the NiFi cluster."; + } + + @Override + public NodesResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final ControllerClient controllerClient = client.getControllerClient(); + + ClusterEntity clusterEntityResult = controllerClient.getNodes(); + return new NodesResult(getResultType(properties), clusterEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java new file mode 100644 index 0000000000..aa759b1c7d --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/OffloadNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.nodes; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ControllerClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.NodeResult; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command for offloading a node of the NiFi cluster. + */ +public class OffloadNode extends AbstractNiFiCommand { + + public OffloadNode() { + super("offload-node", NodeResult.class); + } + + @Override + public String getDescription() { + return "Offloads a node of the NiFi cluster."; + } + + @Override + protected void doInitialize(Context context) { + addOption(CommandOption.NIFI_NODE_ID.createOption()); + } + + @Override + public NodeResult doExecute(NiFiClient client, Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String nodeId = getRequiredArg(properties, CommandOption.NIFI_NODE_ID); + final ControllerClient controllerClient = client.getControllerClient(); + + NodeDTO nodeDto = new NodeDTO(); + nodeDto.setNodeId(nodeId); + // TODO There are no constants for the OFFLOAD node statuses + nodeDto.setStatus("OFFLOADING"); + NodeEntity nodeEntity = new NodeEntity(); + nodeEntity.setNode(nodeDto); + NodeEntity nodeEntityResult = controllerClient.offloadNode(nodeId, nodeEntity); + return new NodeResult(getResultType(properties), nodeEntityResult); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java new file mode 100644 index 0000000000..3e1efdf46e --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodeResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.result; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.toolkit.cli.api.ResultType; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.NodeEntity; + +import java.io.IOException; +import java.io.PrintStream; + +public class NodeResult extends AbstractWritableResult { + + private final NodeEntity nodeEntity; + + public NodeResult(ResultType resultType, NodeEntity nodeEntity) { + super(resultType); + this.nodeEntity = nodeEntity; + Validate.notNull(nodeEntity); + } + + @Override + public NodeEntity getResult() { + return nodeEntity; + } + + @Override + protected void writeSimpleResult(PrintStream output) throws IOException { + NodeDTO nodeDTO = nodeEntity.getNode(); + output.printf("Node ID: %s\nNode Address: %s\nAPI Port: %s\nNode Status:%s", + nodeDTO.getNodeId(), nodeDTO.getAddress(), nodeDTO.getApiPort(), nodeDTO.getStatus()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java new file mode 100644 index 0000000000..daab27fd82 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/NodesResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.result; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.toolkit.cli.api.ResultType; +import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter; +import org.apache.nifi.toolkit.cli.impl.result.writer.Table; +import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.glassfish.jersey.internal.guava.Lists; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +public class NodesResult extends AbstractWritableResult { + + private final ClusterEntity clusterEntity; + + public NodesResult(ResultType resultType, ClusterEntity clusterEntity) { + super(resultType); + this.clusterEntity = clusterEntity; + Validate.notNull(clusterEntity); + } + + @Override + public ClusterEntity getResult() { + return clusterEntity; + } + + @Override + protected void writeSimpleResult(PrintStream output) throws IOException { + final Table table = new Table.Builder() + .column("#", 3, 3, false) + .column("Node ID", 36, 36, false) + .column("Node Address", 36, 36, true) + .column("API Port", 8, 8, false) + .column("Node Status", 13, 13, false) + .build(); + + List nodes = Lists.newArrayList(clusterEntity.getCluster().getNodes()); + for (int i = 0; i < nodes.size(); ++i) { + NodeDTO nodeDTO = nodes.get(i); + table.addRow(String.valueOf(i), nodeDTO.getNodeId(), nodeDTO.getAddress(), String.valueOf(nodeDTO.getApiPort()), nodeDTO.getStatus()); + } + + final TableWriter tableWriter = new DynamicTableWriter(); + tableWriter.write(table, output); + } +}