From 17a36c6fd525dba837e13a71a52ddfd224782fae Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 28 Sep 2016 11:07:22 +0900 Subject: [PATCH] NIFI-2825: Fix S2S getPeers flow file count - Added ClusterWorkload message to retrieve workload information from a cluster coordinator - Use cluster workload to return queued flow file count to site-to-site client so that it can calculate distribution of data transfer This closes #1084. Signed-off-by: Bryan Bende --- .../coordination/ClusterCoordinator.java | 8 +++ .../coordination/node/NodeWorkload.java | 71 +++++++++++++++++++ .../protocol/AbstractNodeProtocolSender.java | 17 +++++ .../cluster/protocol/NodeProtocolSender.java | 10 +++ .../impl/NodeProtocolSenderListener.java | 7 ++ .../protocol/jaxb/message/ObjectFactory.java | 11 +++ .../ClusterWorkloadRequestMessage.java | 30 ++++++++ .../ClusterWorkloadResponseMessage.java | 43 +++++++++++ .../protocol/message/ProtocolMessage.java | 4 +- .../jaxb/message/TestJaxbProtocolUtils.java | 56 +++++++++++++++ .../ClusterProtocolHeartbeatMonitor.java | 39 ++++++++-- .../node/NodeClusterCoordinator.java | 15 +++- .../NodeClusterCoordinatorFactoryBean.java | 5 +- .../TestAbstractHeartbeatMonitor.java | 6 ++ .../node/TestNodeClusterCoordinator.java | 8 ++- .../apache/nifi/cluster/integration/Node.java | 3 +- .../ClusterCoordinatorNodeInformant.java | 23 +++--- .../nifi/web/api/SiteToSiteResource.java | 29 ++++---- .../nifi/web/api/TestSiteToSiteResource.java | 63 ++++++++++++++++ 19 files changed, 413 insertions(+), 35 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 723a374820..1083fe6803 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.coordination; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -24,6 +25,7 @@ import java.util.Set; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.coordination.node.NodeWorkload; import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.reporting.Severity; @@ -236,4 +238,10 @@ public interface ClusterCoordinator { * @return the current status of Flow Election. */ String getFlowElectionStatus(); + + /** + * @return the current cluster workload retrieved from the cluster coordinator. + * @throws IOException thrown when it failed to communicate with the cluster coordinator. + */ + Map getClusterWorkload() throws IOException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java new file mode 100644 index 0000000000..be5653e91b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.node; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "nodeWorkload") +public class NodeWorkload { + + private long reportedTimestamp; + private int flowFileCount; + private long flowFileBytes; + private int activeThreadCount; + private long systemStartTime; + + public long getReportedTimestamp() { + return reportedTimestamp; + } + + public void setReportedTimestamp(long reportedTimestamp) { + this.reportedTimestamp = reportedTimestamp; + } + + public int getFlowFileCount() { + return flowFileCount; + } + + public void setFlowFileCount(int flowFileCount) { + this.flowFileCount = flowFileCount; + } + + public long getFlowFileBytes() { + return flowFileBytes; + } + + public void setFlowFileBytes(long flowFileBytes) { + this.flowFileBytes = flowFileBytes; + } + + public int getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(int activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public long getSystemStartTime() { + return systemStartTime; + } + + public void setSystemStartTime(long systemStartTime) { + this.systemStartTime = systemStartTime; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java index 22d6ebc651..db3fc1d9f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -94,6 +96,21 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'"); } + @Override + public ClusterWorkloadResponseMessage clusterWorkload(final ClusterWorkloadRequestMessage msg) throws ProtocolException { + final InetSocketAddress serviceAddress; + try { + serviceAddress = getServiceAddress(); + } catch (IOException e) { + throw new ProtocolException("Failed to getServiceAddress due to " + e, e); + } + final ProtocolMessage responseMessage = sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort()); + if (MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) { + return (ClusterWorkloadResponseMessage) responseMessage; + } + + throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'"); + } private Socket createSocket() { InetSocketAddress socketAddress = null; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java index fcf519540e..bfcc62cdbc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.protocol; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -49,4 +51,12 @@ public interface NodeProtocolSender { * @return the response from the Cluster Coordinator */ HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException; + + /** + * Sends a "cluster workflow request" message to the Cluster Coordinator. + * @param msg a request message + * @return the response from the Cluster Coordinator + * @throws ProtocolException if communication failed + */ + ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java index 1b0aeeab32..d13b3d3a96 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java @@ -24,6 +24,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -96,4 +98,9 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException { return sender.heartbeat(msg, address); } + + @Override + public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException { + return sender.clusterWorkload(msg); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java index afa87b9f8a..9a594a403e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java @@ -25,6 +25,8 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; @@ -101,4 +103,13 @@ public class ObjectFactory { public HeartbeatResponseMessage createHeartbeatResponse() { return new HeartbeatResponseMessage(); } + + public ClusterWorkloadRequestMessage createClusterWorkloadRequest() { + return new ClusterWorkloadRequestMessage(); + } + + public ClusterWorkloadResponseMessage createClusterWorkloadResponse() { + return new ClusterWorkloadResponseMessage(); + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java new file mode 100644 index 0000000000..d8f87a480e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "clusterWorkloadRequest") +public class ClusterWorkloadRequestMessage extends ProtocolMessage { + + @Override + public MessageType getType() { + return MessageType.CLUSTER_WORKLOAD_REQUEST; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java new file mode 100644 index 0000000000..2852c482de --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol.message; + +import org.apache.nifi.cluster.coordination.node.NodeWorkload; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Map; + +@XmlRootElement(name = "clusterWorkloadResponse") +public class ClusterWorkloadResponseMessage extends ProtocolMessage { + + private Map nodeWorkloads; + + @Override + public MessageType getType() { + return MessageType.CLUSTER_WORKLOAD_RESPONSE; + } + + public Map getNodeWorkloads() { + return nodeWorkloads; + } + + public void setNodeWorkloads(Map nodeWorkloads) { + this.nodeWorkloads = nodeWorkloads; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java index 1d0d1159e4..1cab62f8ba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -35,7 +35,9 @@ public abstract class ProtocolMessage { HEARTBEAT_RESPONSE, NODE_CONNECTION_STATUS_REQUEST, NODE_CONNECTION_STATUS_RESPONSE, - NODE_STATUS_CHANGE; + NODE_STATUS_CHANGE, + CLUSTER_WORKLOAD_REQUEST, + CLUSTER_WORKLOAD_RESPONSE } public abstract MessageType getType(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java index 4fa53e8889..8c2cca6fed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java @@ -23,12 +23,16 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; import javax.xml.bind.JAXBException; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.coordination.node.NodeWorkload; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.DataFlow; @@ -38,6 +42,8 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.web.Revision; @@ -124,4 +130,54 @@ public class TestJaxbProtocolUtils { final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray())); assertTrue(unmarshalled instanceof HeartbeatMessage); } + + @Test + public void testRoundTripClusterWorkloadRequest() throws JAXBException { + final ClusterWorkloadRequestMessage msg = new ClusterWorkloadRequestMessage(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos); + final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray())); + assertTrue(unmarshalled instanceof ClusterWorkloadRequestMessage); + } + + @Test + public void testRoundTripClusterWorkloadResponse() throws JAXBException { + final ClusterWorkloadResponseMessage msg = new ClusterWorkloadResponseMessage(); + final Map expectedNodeWorkloads = new HashMap<>(); + + IntStream.range(1, 4).forEach(i -> { + final String hostname = "node" + i; + final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080, hostname, 8081, hostname, 8082, 8083, false); + final NodeWorkload workload = new NodeWorkload(); + workload.setReportedTimestamp(System.currentTimeMillis() - 1000); + workload.setSystemStartTime(System.currentTimeMillis()); + workload.setActiveThreadCount(i); + workload.setFlowFileCount(i * 10); + workload.setFlowFileBytes(i * 10 * 1024); + expectedNodeWorkloads.put(nodeId, workload); + }); + msg.setNodeWorkloads(expectedNodeWorkloads); + + // Marshall. + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos); + + // Un-marshall. + final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray())); + assertTrue(unmarshalled instanceof ClusterWorkloadResponseMessage); + + // Assert result. + final ClusterWorkloadResponseMessage response = (ClusterWorkloadResponseMessage) unmarshalled; + assertEquals(expectedNodeWorkloads.size(), response.getNodeWorkloads().size()); + response.getNodeWorkloads().entrySet().stream().forEach(entry -> { + assertTrue(expectedNodeWorkloads.containsKey(entry.getKey())); + final NodeWorkload w = entry.getValue(); + NodeWorkload expectedW = expectedNodeWorkloads.get(entry.getKey()); + assertEquals(expectedW.getActiveThreadCount(), w.getActiveThreadCount()); + assertEquals(expectedW.getReportedTimestamp(), w.getReportedTimestamp()); + assertEquals(expectedW.getSystemStartTime(), w.getSystemStartTime()); + assertEquals(expectedW.getFlowFileBytes(), w.getFlowFileBytes()); + assertEquals(expectedW.getFlowFileCount(), w.getFlowFileCount()); + }); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 9f620d905b..6a8e575c6c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -18,6 +18,7 @@ package org.apache.nifi.cluster.coordination.heartbeat; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +31,7 @@ import javax.xml.bind.Unmarshaller; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.coordination.node.NodeWorkload; import org.apache.nifi.cluster.protocol.Heartbeat; import org.apache.nifi.cluster.protocol.HeartbeatPayload; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -38,6 +40,8 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.util.NiFiProperties; @@ -130,11 +134,18 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { - if (msg.getType() != MessageType.HEARTBEAT) { - throw new ProtocolException("Cannot handle message of type " + msg.getType()); + switch (msg.getType()) { + case HEARTBEAT: + return handleHeartbeat((HeartbeatMessage) msg); + case CLUSTER_WORKLOAD_REQUEST: + return handleClusterWorkload((ClusterWorkloadRequestMessage) msg); + default: + throw new ProtocolException("Cannot handle message of type " + msg.getType()); } + } - final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg; + private ProtocolMessage handleHeartbeat(final HeartbeatMessage msg) { + final HeartbeatMessage heartbeatMsg = msg; final Heartbeat heartbeat = heartbeatMsg.getHeartbeat(); final NodeIdentifier nodeId = heartbeat.getNodeIdentifier(); @@ -169,6 +180,26 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im return responseMessage; } + private ProtocolMessage handleClusterWorkload(final ClusterWorkloadRequestMessage msg) { + + final ClusterWorkloadResponseMessage response = new ClusterWorkloadResponseMessage(); + final Map workloads = new HashMap<>(); + getLatestHeartbeats().values().stream() + .filter(hb -> NodeConnectionState.CONNECTED.equals(hb.getConnectionStatus().getState())) + .forEach(hb -> { + NodeWorkload wl = new NodeWorkload(); + wl.setReportedTimestamp(hb.getTimestamp()); + wl.setSystemStartTime(hb.getSystemStartTime()); + wl.setActiveThreadCount(hb.getActiveThreadCount()); + wl.setFlowFileCount(hb.getFlowFileCount()); + wl.setFlowFileBytes(hb.getFlowFileBytes()); + workloads.put(hb.getNodeIdentifier(), wl); + }); + response.setNodeWorkloads(workloads); + + return response; + } + private List getUpdatedStatuses(final List nodeStatusList) { // Map node's statuses by NodeIdentifier for quick & easy lookup final Map nodeStatusMap = nodeStatusList.stream() @@ -201,6 +232,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im @Override public boolean canHandle(ProtocolMessage msg) { - return msg.getType() == MessageType.HEARTBEAT; + return msg.getType() == MessageType.HEARTBEAT || msg.getType() == MessageType.CLUSTER_WORKLOAD_REQUEST; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index a6a6009f84..4b74e1b75a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -49,10 +49,13 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; +import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; @@ -88,6 +91,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final LeaderElectionManager leaderElectionManager; private final AtomicLong latestUpdateId = new AtomicLong(-1); private final FlowElection flowElection; + private final NodeProtocolSender nodeProtocolSender; private volatile FlowService flowService; private volatile boolean connected; @@ -98,7 +102,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final ConcurrentMap> nodeEvents = new ConcurrentHashMap<>(); public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager, - final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) { + final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties, + final NodeProtocolSender nodeProtocolSender) { this.senderListener = senderListener; this.flowService = null; this.eventReporter = eventReporter; @@ -107,6 +112,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl this.nifiProperties = nifiProperties; this.leaderElectionManager = leaderElectionManager; this.flowElection = flowElection; + this.nodeProtocolSender = nodeProtocolSender; senderListener.addHandler(this); } @@ -1043,4 +1049,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl public boolean isConnected() { return connected; } + + @Override + public Map getClusterWorkload() throws IOException { + final ClusterWorkloadRequestMessage request = new ClusterWorkloadRequestMessage(); + final ClusterWorkloadResponseMessage response = nodeProtocolSender.clusterWorkload(request); + return response.getNodeWorkloads(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java index 2845a011db..ac79a42d68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java @@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring; import org.apache.nifi.cluster.coordination.flow.FlowElection; import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; +import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener; import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.events.EventReporter; @@ -46,8 +47,8 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean getClusterWorkload() throws IOException { + return null; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index be9b862f1e..e2577a9591 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -80,7 +80,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) { + coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { nodeStatuses.add(updatedStatus); @@ -135,7 +135,8 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) { + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), + null, revisionManager, createProperties(), null) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@ -175,7 +176,8 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) { + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), + null, revisionManager, createProperties(), null) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index 93c93976a3..7c74680425 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -277,7 +277,8 @@ public class Node { } final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener); - return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, revisionManager, nodeProperties); + return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, + revisionManager, nodeProperties, protocolSender); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java index 9f8439cb33..95381afd55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java @@ -17,12 +17,11 @@ package org.apache.nifi.controller; -import java.util.ArrayList; +import java.io.IOException; import java.util.List; -import java.util.Set; +import java.util.stream.Collectors; import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformant; @@ -37,14 +36,16 @@ public class ClusterCoordinatorNodeInformant implements NodeInformant { @Override public ClusterNodeInformation getNodeInformation() { - final List nodeInfoCollection = new ArrayList<>(); - final Set nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED); - - // TODO: Get total number of FlowFiles for each node - for (final NodeIdentifier nodeId : nodeIds) { - final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), - nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0); - nodeInfoCollection.add(nodeInfo); + final List nodeInfoCollection; + try { + nodeInfoCollection = clusterCoordinator.getClusterWorkload().entrySet().stream().map(entry -> { + final NodeIdentifier nodeId = entry.getKey(); + final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), + nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), entry.getValue().getFlowFileCount()); + return nodeInfo; + }).collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e); } final ClusterNodeInformation nodeInfo = new ClusterNodeInformation(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java index 63e5a35054..efb1c26cb1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java @@ -34,7 +34,7 @@ import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeWorkload; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.VersionNegotiator; @@ -59,13 +59,13 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -216,18 +216,23 @@ public class SiteToSiteResource extends ApplicationResource { final List peers = new ArrayList<>(); if (properties.isNode()) { - final Set nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED); - // TODO: Get total number of FlowFiles for each node - for (final NodeIdentifier nodeId : nodeIds) { - final PeerDTO peer = new PeerDTO(); - final String siteToSiteAddress = nodeId.getSiteToSiteAddress(); - peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress); - peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort()); - peer.setSecure(nodeId.isSiteToSiteSecure()); - peer.setFlowFileCount(0); - peers.add(peer); + try { + final Map clusterWorkload = clusterCoordinator.getClusterWorkload(); + clusterWorkload.entrySet().stream().forEach(entry -> { + final PeerDTO peer = new PeerDTO(); + final NodeIdentifier nodeId = entry.getKey(); + final String siteToSiteAddress = nodeId.getSiteToSiteAddress(); + peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress); + peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort()); + peer.setSecure(nodeId.isSiteToSiteSecure()); + peer.setFlowFileCount(entry.getValue().getFlowFileCount()); + peers.add(peer); + }); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e); } + } else { // Standalone mode. final PeerDTO peer = new PeerDTO(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java index 60a7ba9280..5f3aef3ddc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.web.api; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeWorkload; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.http.HttpHeaders; import org.apache.nifi.util.NiFiProperties; @@ -30,12 +33,17 @@ import org.junit.Test; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestSiteToSiteResource { @@ -116,6 +124,47 @@ public class TestSiteToSiteResource { assertEquals(1, resultEntity.getPeers().size()); } + @Test + public void testPeersClustered() throws Exception { + final HttpServletRequest req = createCommonHttpServletRequest(); + + final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class); + + final SiteToSiteResource resource = getSiteToSiteResourceClustered(serviceFacade); + + final ClusterCoordinator clusterCoordinator = mock(ClusterCoordinator.class); + final Map hostportWorkloads = new HashMap<>(); + final Map workloads = new HashMap<>(); + IntStream.range(1, 4).forEach(i -> { + final String hostname = "node" + i; + final int siteToSiteHttpApiPort = 8110 + i; + final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080 + i, hostname, 8090 + i, hostname, 8100 + i, siteToSiteHttpApiPort, false); + final NodeWorkload workload = new NodeWorkload(); + workload.setReportedTimestamp(System.currentTimeMillis() - i); + workload.setFlowFileBytes(1024 * i); + workload.setFlowFileCount(10 * i); + workload.setActiveThreadCount(i); + workload.setSystemStartTime(System.currentTimeMillis() - (1000 * i)); + workloads.put(nodeId, workload); + hostportWorkloads.put(hostname + ":" + siteToSiteHttpApiPort, workload); + }); + when(clusterCoordinator.getClusterWorkload()).thenReturn(workloads); + resource.setClusterCoordinator(clusterCoordinator); + + final Response response = resource.getPeers(req); + + PeersEntity resultEntity = (PeersEntity) response.getEntity(); + + assertEquals(200, response.getStatus()); + assertEquals(3, resultEntity.getPeers().size()); + resultEntity.getPeers().stream().forEach(peerDTO -> { + final NodeWorkload workload = hostportWorkloads.get(peerDTO.getHostname() + ":" + peerDTO.getPort()); + assertNotNull(workload); + assertEquals(workload.getFlowFileCount(), peerDTO.getFlowFileCount()); + }); + + } + @Test public void testPeersVersionWasNotSpecified() throws Exception { @@ -160,4 +209,18 @@ public class TestSiteToSiteResource { resource.setServiceFacade(serviceFacade); return resource; } + + private SiteToSiteResource getSiteToSiteResourceClustered(final NiFiServiceFacade serviceFacade) { + final Map clusterSettings = new HashMap<>(); + clusterSettings.put(NiFiProperties.CLUSTER_IS_NODE, "true"); + final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, clusterSettings); + final SiteToSiteResource resource = new SiteToSiteResource(properties) { + @Override + protected void authorizeSiteToSite() { + } + }; + resource.setProperties(properties); + resource.setServiceFacade(serviceFacade); + return resource; + } } \ No newline at end of file