NIFI-2825: Fix S2S getPeers flow file count

- Added ClusterWorkload message to retrieve workload information from a
  cluster coordinator
- Use cluster workload to return queued flow file count to site-to-site
  client so that it can calculate distribution of data transfer

This closes #1084.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2016-09-28 11:07:22 +09:00 committed by Bryan Bende
parent 34e5a5321a
commit 17a36c6fd5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
19 changed files with 413 additions and 35 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.cluster.coordination;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -24,6 +25,7 @@ import java.util.Set;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@ -236,4 +238,10 @@ public interface ClusterCoordinator {
* @return the current status of Flow Election.
*/
String getFlowElectionStatus();
/**
* @return the current cluster workload retrieved from the cluster coordinator.
* @throws IOException thrown when it failed to communicate with the cluster coordinator.
*/
Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException;
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.node;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "nodeWorkload")
public class NodeWorkload {
private long reportedTimestamp;
private int flowFileCount;
private long flowFileBytes;
private int activeThreadCount;
private long systemStartTime;
public long getReportedTimestamp() {
return reportedTimestamp;
}
public void setReportedTimestamp(long reportedTimestamp) {
this.reportedTimestamp = reportedTimestamp;
}
public int getFlowFileCount() {
return flowFileCount;
}
public void setFlowFileCount(int flowFileCount) {
this.flowFileCount = flowFileCount;
}
public long getFlowFileBytes() {
return flowFileBytes;
}
public void setFlowFileBytes(long flowFileBytes) {
this.flowFileBytes = flowFileBytes;
}
public int getActiveThreadCount() {
return activeThreadCount;
}
public void setActiveThreadCount(int activeThreadCount) {
this.activeThreadCount = activeThreadCount;
}
public long getSystemStartTime() {
return systemStartTime;
}
public void setSystemStartTime(long systemStartTime) {
this.systemStartTime = systemStartTime;
}
}

View File

@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@ -94,6 +96,21 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'");
}
@Override
public ClusterWorkloadResponseMessage clusterWorkload(final ClusterWorkloadRequestMessage msg) throws ProtocolException {
final InetSocketAddress serviceAddress;
try {
serviceAddress = getServiceAddress();
} catch (IOException e) {
throw new ProtocolException("Failed to getServiceAddress due to " + e, e);
}
final ProtocolMessage responseMessage = sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort());
if (MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) {
return (ClusterWorkloadResponseMessage) responseMessage;
}
throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'");
}
private Socket createSocket() {
InetSocketAddress socketAddress = null;

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.cluster.protocol;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@ -49,4 +51,12 @@ public interface NodeProtocolSender {
* @return the response from the Cluster Coordinator
*/
HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
/**
* Sends a "cluster workflow request" message to the Cluster Coordinator.
* @param msg a request message
* @return the response from the Cluster Coordinator
* @throws ProtocolException if communication failed
*/
ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException;
}

View File

@ -24,6 +24,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@ -96,4 +98,9 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
return sender.heartbeat(msg, address);
}
@Override
public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
return sender.clusterWorkload(msg);
}
}

View File

@ -25,6 +25,8 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
@ -101,4 +103,13 @@ public class ObjectFactory {
public HeartbeatResponseMessage createHeartbeatResponse() {
return new HeartbeatResponseMessage();
}
public ClusterWorkloadRequestMessage createClusterWorkloadRequest() {
return new ClusterWorkloadRequestMessage();
}
public ClusterWorkloadResponseMessage createClusterWorkloadResponse() {
return new ClusterWorkloadResponseMessage();
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.message;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "clusterWorkloadRequest")
public class ClusterWorkloadRequestMessage extends ProtocolMessage {
@Override
public MessageType getType() {
return MessageType.CLUSTER_WORKLOAD_REQUEST;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.message;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Map;
@XmlRootElement(name = "clusterWorkloadResponse")
public class ClusterWorkloadResponseMessage extends ProtocolMessage {
private Map<NodeIdentifier, NodeWorkload> nodeWorkloads;
@Override
public MessageType getType() {
return MessageType.CLUSTER_WORKLOAD_RESPONSE;
}
public Map<NodeIdentifier, NodeWorkload> getNodeWorkloads() {
return nodeWorkloads;
}
public void setNodeWorkloads(Map<NodeIdentifier, NodeWorkload> nodeWorkloads) {
this.nodeWorkloads = nodeWorkloads;
}
}

View File

@ -35,7 +35,9 @@ public abstract class ProtocolMessage {
HEARTBEAT_RESPONSE,
NODE_CONNECTION_STATUS_REQUEST,
NODE_CONNECTION_STATUS_RESPONSE,
NODE_STATUS_CHANGE;
NODE_STATUS_CHANGE,
CLUSTER_WORKLOAD_REQUEST,
CLUSTER_WORKLOAD_RESPONSE
}
public abstract MessageType getType();

View File

@ -23,12 +23,16 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import javax.xml.bind.JAXBException;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
@ -38,6 +42,8 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.web.Revision;
@ -124,4 +130,54 @@ public class TestJaxbProtocolUtils {
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof HeartbeatMessage);
}
@Test
public void testRoundTripClusterWorkloadRequest() throws JAXBException {
final ClusterWorkloadRequestMessage msg = new ClusterWorkloadRequestMessage();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof ClusterWorkloadRequestMessage);
}
@Test
public void testRoundTripClusterWorkloadResponse() throws JAXBException {
final ClusterWorkloadResponseMessage msg = new ClusterWorkloadResponseMessage();
final Map<NodeIdentifier, NodeWorkload> expectedNodeWorkloads = new HashMap<>();
IntStream.range(1, 4).forEach(i -> {
final String hostname = "node" + i;
final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080, hostname, 8081, hostname, 8082, 8083, false);
final NodeWorkload workload = new NodeWorkload();
workload.setReportedTimestamp(System.currentTimeMillis() - 1000);
workload.setSystemStartTime(System.currentTimeMillis());
workload.setActiveThreadCount(i);
workload.setFlowFileCount(i * 10);
workload.setFlowFileBytes(i * 10 * 1024);
expectedNodeWorkloads.put(nodeId, workload);
});
msg.setNodeWorkloads(expectedNodeWorkloads);
// Marshall.
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
// Un-marshall.
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof ClusterWorkloadResponseMessage);
// Assert result.
final ClusterWorkloadResponseMessage response = (ClusterWorkloadResponseMessage) unmarshalled;
assertEquals(expectedNodeWorkloads.size(), response.getNodeWorkloads().size());
response.getNodeWorkloads().entrySet().stream().forEach(entry -> {
assertTrue(expectedNodeWorkloads.containsKey(entry.getKey()));
final NodeWorkload w = entry.getValue();
NodeWorkload expectedW = expectedNodeWorkloads.get(entry.getKey());
assertEquals(expectedW.getActiveThreadCount(), w.getActiveThreadCount());
assertEquals(expectedW.getReportedTimestamp(), w.getReportedTimestamp());
assertEquals(expectedW.getSystemStartTime(), w.getSystemStartTime());
assertEquals(expectedW.getFlowFileBytes(), w.getFlowFileBytes());
assertEquals(expectedW.getFlowFileCount(), w.getFlowFileCount());
});
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.cluster.coordination.heartbeat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -30,6 +31,7 @@ import javax.xml.bind.Unmarshaller;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -38,6 +40,8 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.util.NiFiProperties;
@ -130,11 +134,18 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override
public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
if (msg.getType() != MessageType.HEARTBEAT) {
throw new ProtocolException("Cannot handle message of type " + msg.getType());
switch (msg.getType()) {
case HEARTBEAT:
return handleHeartbeat((HeartbeatMessage) msg);
case CLUSTER_WORKLOAD_REQUEST:
return handleClusterWorkload((ClusterWorkloadRequestMessage) msg);
default:
throw new ProtocolException("Cannot handle message of type " + msg.getType());
}
}
final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg;
private ProtocolMessage handleHeartbeat(final HeartbeatMessage msg) {
final HeartbeatMessage heartbeatMsg = msg;
final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
@ -169,6 +180,26 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
return responseMessage;
}
private ProtocolMessage handleClusterWorkload(final ClusterWorkloadRequestMessage msg) {
final ClusterWorkloadResponseMessage response = new ClusterWorkloadResponseMessage();
final Map<NodeIdentifier, NodeWorkload> workloads = new HashMap<>();
getLatestHeartbeats().values().stream()
.filter(hb -> NodeConnectionState.CONNECTED.equals(hb.getConnectionStatus().getState()))
.forEach(hb -> {
NodeWorkload wl = new NodeWorkload();
wl.setReportedTimestamp(hb.getTimestamp());
wl.setSystemStartTime(hb.getSystemStartTime());
wl.setActiveThreadCount(hb.getActiveThreadCount());
wl.setFlowFileCount(hb.getFlowFileCount());
wl.setFlowFileBytes(hb.getFlowFileBytes());
workloads.put(hb.getNodeIdentifier(), wl);
});
response.setNodeWorkloads(workloads);
return response;
}
private List<NodeConnectionStatus> getUpdatedStatuses(final List<NodeConnectionStatus> nodeStatusList) {
// Map node's statuses by NodeIdentifier for quick & easy lookup
final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = nodeStatusList.stream()
@ -201,6 +232,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override
public boolean canHandle(ProtocolMessage msg) {
return msg.getType() == MessageType.HEARTBEAT;
return msg.getType() == MessageType.HEARTBEAT || msg.getType() == MessageType.CLUSTER_WORKLOAD_REQUEST;
}
}

View File

@ -49,10 +49,13 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
@ -88,6 +91,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final LeaderElectionManager leaderElectionManager;
private final AtomicLong latestUpdateId = new AtomicLong(-1);
private final FlowElection flowElection;
private final NodeProtocolSender nodeProtocolSender;
private volatile FlowService flowService;
private volatile boolean connected;
@ -98,7 +102,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
final NodeProtocolSender nodeProtocolSender) {
this.senderListener = senderListener;
this.flowService = null;
this.eventReporter = eventReporter;
@ -107,6 +112,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.nifiProperties = nifiProperties;
this.leaderElectionManager = leaderElectionManager;
this.flowElection = flowElection;
this.nodeProtocolSender = nodeProtocolSender;
senderListener.addHandler(this);
}
@ -1043,4 +1049,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
public boolean isConnected() {
return connected;
}
@Override
public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
final ClusterWorkloadRequestMessage request = new ClusterWorkloadRequestMessage();
final ClusterWorkloadResponseMessage response = nodeProtocolSender.clusterWorkload(request);
return response.getNodeWorkloads();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.events.EventReporter;
@ -46,8 +47,8 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class);
final FlowElection flowElection = applicationContext.getBean("flowElection", FlowElection.class);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties);
final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties, nodeProtocolSender);
}
return nodeClusterCoordinator;

View File

@ -36,6 +36,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@ -329,6 +330,11 @@ public class TestAbstractHeartbeatMonitor {
public String getFlowElectionStatus() {
return null;
}
@Override
public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
return null;
}
}

View File

@ -80,7 +80,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
@ -135,7 +135,8 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
null, revisionManager, createProperties(), null) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@ -175,7 +176,8 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
null, revisionManager, createProperties(), null) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}

View File

@ -277,7 +277,8 @@ public class Node {
}
final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, revisionManager, nodeProperties);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
revisionManager, nodeProperties, protocolSender);
}

View File

@ -17,12 +17,11 @@
package org.apache.nifi.controller;
import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformant;
@ -37,14 +36,16 @@ public class ClusterCoordinatorNodeInformant implements NodeInformant {
@Override
public ClusterNodeInformation getNodeInformation() {
final List<NodeInformation> nodeInfoCollection = new ArrayList<>();
final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
// TODO: Get total number of FlowFiles for each node
for (final NodeIdentifier nodeId : nodeIds) {
final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0);
nodeInfoCollection.add(nodeInfo);
final List<NodeInformation> nodeInfoCollection;
try {
nodeInfoCollection = clusterCoordinator.getClusterWorkload().entrySet().stream().map(entry -> {
final NodeIdentifier nodeId = entry.getKey();
final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), entry.getValue().getFlowFileCount());
return nodeInfo;
}).collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e);
}
final ClusterNodeInformation nodeInfo = new ClusterNodeInformation();

View File

@ -34,7 +34,7 @@ import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.VersionNegotiator;
@ -59,13 +59,13 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@ -216,18 +216,23 @@ public class SiteToSiteResource extends ApplicationResource {
final List<PeerDTO> peers = new ArrayList<>();
if (properties.isNode()) {
final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
// TODO: Get total number of FlowFiles for each node
for (final NodeIdentifier nodeId : nodeIds) {
final PeerDTO peer = new PeerDTO();
final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
peer.setSecure(nodeId.isSiteToSiteSecure());
peer.setFlowFileCount(0);
peers.add(peer);
try {
final Map<NodeIdentifier, NodeWorkload> clusterWorkload = clusterCoordinator.getClusterWorkload();
clusterWorkload.entrySet().stream().forEach(entry -> {
final PeerDTO peer = new PeerDTO();
final NodeIdentifier nodeId = entry.getKey();
final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
peer.setSecure(nodeId.isSiteToSiteSecure());
peer.setFlowFileCount(entry.getValue().getFlowFileCount());
peers.add(peer);
});
} catch (IOException e) {
throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e);
}
} else {
// Standalone mode.
final PeerDTO peer = new PeerDTO();

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.web.api;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.util.NiFiProperties;
@ -30,12 +33,17 @@ import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestSiteToSiteResource {
@ -116,6 +124,47 @@ public class TestSiteToSiteResource {
assertEquals(1, resultEntity.getPeers().size());
}
@Test
public void testPeersClustered() throws Exception {
final HttpServletRequest req = createCommonHttpServletRequest();
final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
final SiteToSiteResource resource = getSiteToSiteResourceClustered(serviceFacade);
final ClusterCoordinator clusterCoordinator = mock(ClusterCoordinator.class);
final Map<String, NodeWorkload> hostportWorkloads = new HashMap<>();
final Map<NodeIdentifier, NodeWorkload> workloads = new HashMap<>();
IntStream.range(1, 4).forEach(i -> {
final String hostname = "node" + i;
final int siteToSiteHttpApiPort = 8110 + i;
final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080 + i, hostname, 8090 + i, hostname, 8100 + i, siteToSiteHttpApiPort, false);
final NodeWorkload workload = new NodeWorkload();
workload.setReportedTimestamp(System.currentTimeMillis() - i);
workload.setFlowFileBytes(1024 * i);
workload.setFlowFileCount(10 * i);
workload.setActiveThreadCount(i);
workload.setSystemStartTime(System.currentTimeMillis() - (1000 * i));
workloads.put(nodeId, workload);
hostportWorkloads.put(hostname + ":" + siteToSiteHttpApiPort, workload);
});
when(clusterCoordinator.getClusterWorkload()).thenReturn(workloads);
resource.setClusterCoordinator(clusterCoordinator);
final Response response = resource.getPeers(req);
PeersEntity resultEntity = (PeersEntity) response.getEntity();
assertEquals(200, response.getStatus());
assertEquals(3, resultEntity.getPeers().size());
resultEntity.getPeers().stream().forEach(peerDTO -> {
final NodeWorkload workload = hostportWorkloads.get(peerDTO.getHostname() + ":" + peerDTO.getPort());
assertNotNull(workload);
assertEquals(workload.getFlowFileCount(), peerDTO.getFlowFileCount());
});
}
@Test
public void testPeersVersionWasNotSpecified() throws Exception {
@ -160,4 +209,18 @@ public class TestSiteToSiteResource {
resource.setServiceFacade(serviceFacade);
return resource;
}
private SiteToSiteResource getSiteToSiteResourceClustered(final NiFiServiceFacade serviceFacade) {
final Map<String, String> clusterSettings = new HashMap<>();
clusterSettings.put(NiFiProperties.CLUSTER_IS_NODE, "true");
final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, clusterSettings);
final SiteToSiteResource resource = new SiteToSiteResource(properties) {
@Override
protected void authorizeSiteToSite() {
}
};
resource.setProperties(properties);
resource.setServiceFacade(serviceFacade);
return resource;
}
}