mirror of https://github.com/apache/nifi.git
NIFI-5585 A node that was previously offloaded can now be reconnected to the cluster and queue flowfiles again
Added Spock test for NonLocalPartitionPartitioner Updated NOTICE files for FontAwesome with the updated version (4.7.0) and URL to the free license Updated package-lock.json with the updated version of FontAwesome (4.7.0) Added method to FlowFileQueue interface to reset an offloaded queue Queues that are now immediately have the offloaded status reset once offloading finishes SocketLoadBalancedFlowFileQueue now ignores back-pressure when offloading flowfiles Cleaned up javascript in nf-cluster-table.js when creating markup for the node operation icons Fixed incorrect handling of a heartbeat from an offloaded node. Heartbeats from offloading or offloaded nodes will now be reported as an event, the heartbeat will be removed and ignored. Added unit tests and integration tests to cover offloading nodes Updated Cluster integration test class with accessor for the current cluster coordinator Updated Node integration test class's custom NiFiProperties implementation to return the load balancing port and a method to assert an offloaded node Added exclusion to top-level pom for ITSpec.class
This commit is contained in:
parent
be2c24cfaf
commit
01e2098d24
|
@ -1873,4 +1873,4 @@ SIL OFL 1.1
|
|||
******************
|
||||
|
||||
The following binary components are provided under the SIL Open Font License 1.1
|
||||
(SIL OFL 1.1) FontAwesome (4.6.1 - http://fortawesome.github.io/Font-Awesome/license/)
|
||||
(SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free)
|
||||
|
|
|
@ -267,8 +267,22 @@ public interface FlowFileQueue {
|
|||
|
||||
void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String partitioningAttribute);
|
||||
|
||||
/**
|
||||
* Offloads the flowfiles in the queue to other nodes. This disables the queue from partition flowfiles locally.
|
||||
* <p>
|
||||
* This operation is a no-op if the node that contains this queue is not in a cluster.
|
||||
*/
|
||||
void offloadQueue();
|
||||
|
||||
/**
|
||||
* Resets a queue that has previously been offloaded. This allows the queue to partition flowfiles locally, and
|
||||
* has no other effect on processors or remote process groups.
|
||||
* <p>
|
||||
* This operation is a no-op if the queue is not currently offloaded or the node that contains this queue is not
|
||||
* clustered.
|
||||
*/
|
||||
void resetOffloadedQueue();
|
||||
|
||||
LoadBalanceStrategy getLoadBalanceStrategy();
|
||||
|
||||
void setLoadBalanceCompression(LoadBalanceCompression compression);
|
||||
|
|
|
@ -212,6 +212,6 @@ SIL OFL 1.1
|
|||
******************
|
||||
|
||||
The following binary components are provided under the SIL Open Font License 1.1
|
||||
(SIL OFL 1.1) FontAwesome (4.6.1 - http://fortawesome.github.io/Font-Awesome/license/)
|
||||
(SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free)
|
||||
|
||||
|
||||
|
|
|
@ -72,7 +72,14 @@ public interface ClusterCoordinator {
|
|||
/**
|
||||
* Sends a request to the node to be offloaded.
|
||||
* The node will be marked as offloading immediately.
|
||||
*
|
||||
* <p>
|
||||
* When a node is offloaded:
|
||||
* <ul>
|
||||
* <li>all processors on the node are stopped</li>
|
||||
* <li>all processors on the node are terminated</li>
|
||||
* <li>all remote process groups on the node stop transmitting</li>
|
||||
* <li>all flowfiles on the node are sent to other nodes in the cluster</li>
|
||||
* </ul>
|
||||
* @param nodeId the identifier of the node
|
||||
* @param offloadCode the code that represents why this node is being asked to be offloaded
|
||||
* @param explanation an explanation as to why the node is being asked to be offloaded
|
||||
|
|
|
@ -228,12 +228,14 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
|
|||
return;
|
||||
}
|
||||
|
||||
if (NodeConnectionState.OFFLOADED == connectionState) {
|
||||
// Cluster Coordinator believes that node is offloaded, but let the node reconnect
|
||||
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node that is offloaded. " +
|
||||
"Marking as Disconnected and requesting that Node reconnect to cluster");
|
||||
clusterCoordinator.requestNodeConnect(nodeId, null);
|
||||
if (NodeConnectionState.OFFLOADED == connectionState || NodeConnectionState.OFFLOADING == connectionState) {
|
||||
// Cluster Coordinator can ignore this heartbeat since the node is offloaded
|
||||
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node that is offloading " +
|
||||
"or offloaded. Removing this heartbeat. Offloaded nodes will only be reconnected to the cluster by an " +
|
||||
"explicit connection request or restarting the node.");
|
||||
removeHeartbeat(nodeId);
|
||||
}
|
||||
|
||||
if (NodeConnectionState.DISCONNECTED == connectionState) {
|
||||
// ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
|
||||
// the only node. We allow it if it is the only node because if we have a one-node cluster, then
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestExc
|
|||
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
|
||||
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
||||
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
|
||||
import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
|
||||
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
|
||||
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
|
|
|
@ -486,11 +486,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
|||
}
|
||||
|
||||
if (state != NodeConnectionState.OFFLOADING) {
|
||||
logger.warn("Attempted to finish node offload for {} but node is not in a offload state, it is currently {}.", nodeId, state);
|
||||
logger.warn("Attempted to finish node offload for {} but node is not in the offloading state, it is currently {}.", nodeId, state);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("{} is now offloaded", nodeId);
|
||||
|
||||
updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.coordination.node
|
||||
|
||||
import org.apache.nifi.cluster.coordination.flow.FlowElection
|
||||
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier
|
||||
import org.apache.nifi.cluster.protocol.NodeProtocolSender
|
||||
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener
|
||||
import org.apache.nifi.cluster.protocol.message.OffloadMessage
|
||||
import org.apache.nifi.components.state.Scope
|
||||
import org.apache.nifi.components.state.StateManager
|
||||
import org.apache.nifi.components.state.StateManagerProvider
|
||||
import org.apache.nifi.controller.leader.election.LeaderElectionManager
|
||||
import org.apache.nifi.events.EventReporter
|
||||
import org.apache.nifi.reporting.Severity
|
||||
import org.apache.nifi.state.MockStateMap
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.apache.nifi.web.revision.RevisionManager
|
||||
import spock.lang.Specification
|
||||
import spock.util.concurrent.BlockingVariable
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class NodeClusterCoordinatorSpec extends Specification {
|
||||
def "requestNodeOffload"() {
|
||||
given: 'mocked collaborators'
|
||||
def clusterCoordinationProtocolSenderListener = Mock(ClusterCoordinationProtocolSenderListener)
|
||||
def eventReporter = Mock EventReporter
|
||||
def stateManager = Mock StateManager
|
||||
def stateMap = new MockStateMap([:], 1)
|
||||
stateManager.getState(_ as Scope) >> stateMap
|
||||
def stateManagerProvider = Mock StateManagerProvider
|
||||
stateManagerProvider.getStateManager(_ as String) >> stateManager
|
||||
|
||||
and: 'a NodeClusterCoordinator that manages node status in a synchronized list'
|
||||
List<NodeConnectionStatus> nodeStatuses = [].asSynchronized()
|
||||
def clusterCoordinator = new NodeClusterCoordinator(clusterCoordinationProtocolSenderListener, eventReporter, Mock(LeaderElectionManager),
|
||||
Mock(FlowElection), Mock(ClusterNodeFirewall),
|
||||
Mock(RevisionManager), NiFiProperties.createBasicNiFiProperties('src/test/resources/conf/nifi.properties', [:]),
|
||||
Mock(NodeProtocolSender), stateManagerProvider) {
|
||||
@Override
|
||||
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
|
||||
nodeStatuses.add(updatedStatus)
|
||||
}
|
||||
}
|
||||
|
||||
and: 'two nodes'
|
||||
def nodeIdentifier1 = createNodeIdentifier 1
|
||||
def nodeIdentifier2 = createNodeIdentifier 2
|
||||
|
||||
and: 'node 1 is connected, node 2 is disconnected'
|
||||
clusterCoordinator.updateNodeStatus new NodeConnectionStatus(nodeIdentifier1, NodeConnectionState.CONNECTED)
|
||||
clusterCoordinator.updateNodeStatus new NodeConnectionStatus(nodeIdentifier2, NodeConnectionState.DISCONNECTED)
|
||||
while (nodeStatuses.size() < 2) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
nodeStatuses.clear()
|
||||
|
||||
def waitForReportEvent = new BlockingVariable(5, TimeUnit.SECONDS)
|
||||
|
||||
when: 'a node is requested to offload'
|
||||
clusterCoordinator.requestNodeOffload nodeIdentifier2, OffloadCode.OFFLOADED, 'unit test for offloading node'
|
||||
waitForReportEvent.get()
|
||||
|
||||
then: 'no exceptions are thrown'
|
||||
noExceptionThrown()
|
||||
|
||||
and: 'expected methods on collaborators are invoked'
|
||||
1 * clusterCoordinationProtocolSenderListener.offload({ OffloadMessage msg -> msg.nodeId == nodeIdentifier2 } as OffloadMessage)
|
||||
1 * eventReporter.reportEvent(Severity.INFO, 'Clustering', { msg -> msg.contains "$nodeIdentifier2.apiAddress:$nodeIdentifier2.apiPort" } as String) >> {
|
||||
waitForReportEvent.set(it)
|
||||
}
|
||||
|
||||
and: 'the status of the offloaded node is known by the cluster coordinator to be offloading'
|
||||
nodeStatuses[0].nodeIdentifier == nodeIdentifier2
|
||||
nodeStatuses[0].state == NodeConnectionState.OFFLOADING
|
||||
}
|
||||
|
||||
private static NodeIdentifier createNodeIdentifier(final int index) {
|
||||
new NodeIdentifier("node-id-$index", "localhost", 8000 + index, "localhost", 9000 + index,
|
||||
"localhost", 10000 + index, 11000 + index, false)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.integration
|
||||
|
||||
import org.apache.nifi.cluster.coordination.node.DisconnectionCode
|
||||
import org.apache.nifi.cluster.coordination.node.OffloadCode
|
||||
import spock.lang.Specification
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class OffloadNodeITSpec extends Specification {
|
||||
def "requestNodeOffload"() {
|
||||
given: 'a cluster with 3 nodes'
|
||||
System.setProperty 'nifi.properties.file.path', 'src/test/resources/conf/nifi.properties'
|
||||
def cluster = new Cluster()
|
||||
cluster.start()
|
||||
cluster.createNode()
|
||||
def nodeToOffload = cluster.createNode()
|
||||
cluster.createNode()
|
||||
cluster.waitUntilAllNodesConnected 20, TimeUnit.SECONDS
|
||||
|
||||
when: 'the node to offload is disconnected successfully'
|
||||
cluster.currentClusterCoordinator.clusterCoordinator.requestNodeDisconnect nodeToOffload.identifier, DisconnectionCode.USER_DISCONNECTED,
|
||||
'integration test user disconnect'
|
||||
cluster.currentClusterCoordinator.assertNodeDisconnects nodeToOffload.identifier, 10, TimeUnit.SECONDS
|
||||
|
||||
and: 'the node to offload is requested to offload'
|
||||
nodeToOffload.getClusterCoordinator().requestNodeOffload nodeToOffload.identifier, OffloadCode.OFFLOADED, 'integration test offload'
|
||||
|
||||
then: 'the node has been successfully offloaded'
|
||||
cluster.currentClusterCoordinator.assertNodeIsOffloaded nodeToOffload.identifier, 10, TimeUnit.SECONDS
|
||||
|
||||
cleanup:
|
||||
cluster.stop()
|
||||
}
|
||||
}
|
|
@ -144,6 +144,10 @@ public class Cluster {
|
|||
return node;
|
||||
}
|
||||
|
||||
public Node getCurrentClusterCoordinator() {
|
||||
return getNodes().stream().filter(node -> node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null);
|
||||
}
|
||||
|
||||
public Node waitForClusterCoordinator(final long time, final TimeUnit timeUnit) {
|
||||
return ClusterUtils.waitUntilNonNull(time, timeUnit,
|
||||
() -> getNodes().stream().filter(node -> node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
|
||||
|
|
|
@ -108,6 +108,8 @@ public class Node {
|
|||
return String.valueOf(nodeId.getSocketPort());
|
||||
}else if(key.equals(NiFiProperties.WEB_HTTP_PORT)){
|
||||
return String.valueOf(nodeId.getApiPort());
|
||||
}else if(key.equals(NiFiProperties.LOAD_BALANCE_PORT)){
|
||||
return String.valueOf(nodeId.getLoadBalancePort());
|
||||
}else {
|
||||
return properties.getProperty(key);
|
||||
}
|
||||
|
@ -386,4 +388,17 @@ public class Node {
|
|||
public void assertNodeIsConnected(final NodeIdentifier nodeId) {
|
||||
Assert.assertEquals(NodeConnectionState.CONNECTED, getClusterCoordinator().getConnectionStatus(nodeId).getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the node with the given ID is offloaded (according to this node!) within the given amount of time
|
||||
*
|
||||
* @param nodeId id of the node
|
||||
* @param time how long to wait
|
||||
* @param timeUnit unit of time provided by the 'time' argument
|
||||
*/
|
||||
public void assertNodeIsOffloaded(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) {
|
||||
ClusterUtils.waitUntilConditionMet(time, timeUnit,
|
||||
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.OFFLOADED,
|
||||
() -> "Connection Status is " + getClusterCoordinator().getConnectionStatus(nodeId).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -691,20 +691,19 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
private void offload(final String explanation) throws InterruptedException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
|
||||
logger.info("Offloading node due to " + explanation);
|
||||
|
||||
// mark node as offloading
|
||||
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation));
|
||||
// request to stop all processors on node
|
||||
controller.stopAllProcessors();
|
||||
// request to stop all remote process groups
|
||||
controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting);
|
||||
// terminate all processors
|
||||
controller.getRootGroup().findAllProcessors()
|
||||
// filter stream, only stopped processors can be terminated
|
||||
.stream().filter(pn -> pn.getScheduledState() == ScheduledState.STOPPED)
|
||||
.forEach(pn -> pn.getProcessGroup().terminateProcessor(pn));
|
||||
// request to stop all remote process groups
|
||||
controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting);
|
||||
// offload all queues on node
|
||||
controller.getAllQueues().forEach(FlowFileQueue::offloadQueue);
|
||||
// wait for rebalance of flowfiles on all queues
|
||||
|
@ -713,6 +712,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
Thread.sleep(1000);
|
||||
}
|
||||
// finish offload
|
||||
controller.getAllQueues().forEach(FlowFileQueue::resetOffloadedQueue);
|
||||
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation));
|
||||
clusterCoordinator.finishNodeOffload(getNodeId());
|
||||
|
||||
|
|
|
@ -78,6 +78,10 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
|
|||
public void offloadQueue() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetOffloadedQueue() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActivelyLoadBalancing() {
|
||||
return false;
|
||||
|
|
|
@ -189,34 +189,41 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
if (!offloaded) {
|
||||
// We are already load balancing but are changing how we are load balancing.
|
||||
final FlowFilePartitioner partitioner;
|
||||
switch (strategy) {
|
||||
case DO_NOT_LOAD_BALANCE:
|
||||
partitioner = new LocalPartitionPartitioner();
|
||||
break;
|
||||
case PARTITION_BY_ATTRIBUTE:
|
||||
partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
|
||||
break;
|
||||
case ROUND_ROBIN:
|
||||
partitioner = new RoundRobinPartitioner();
|
||||
break;
|
||||
case SINGLE_NODE:
|
||||
partitioner = new FirstNodePartitioner();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
partitioner = getPartitionerForLoadBalancingStrategy(strategy, partitioningAttribute);
|
||||
|
||||
setFlowFilePartitioner(partitioner);
|
||||
}
|
||||
}
|
||||
|
||||
private FlowFilePartitioner getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy strategy, String partitioningAttribute) {
|
||||
FlowFilePartitioner partitioner;
|
||||
switch (strategy) {
|
||||
case DO_NOT_LOAD_BALANCE:
|
||||
partitioner = new LocalPartitionPartitioner();
|
||||
break;
|
||||
case PARTITION_BY_ATTRIBUTE:
|
||||
partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
|
||||
break;
|
||||
case ROUND_ROBIN:
|
||||
partitioner = new RoundRobinPartitioner();
|
||||
break;
|
||||
case SINGLE_NODE:
|
||||
partitioner = new FirstNodePartitioner();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
return partitioner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offloadQueue() {
|
||||
if (clusterCoordinator == null) {
|
||||
// Not clustered, so don't change partitions
|
||||
// Not clustered, cannot offload the queue to other nodes
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug("Setting queue {} on node {} as offloaded", this, clusterCoordinator.getLocalNodeIdentifier());
|
||||
offloaded = true;
|
||||
|
||||
partitionWriteLock.lock();
|
||||
|
@ -248,6 +255,26 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetOffloadedQueue() {
|
||||
if (clusterCoordinator == null) {
|
||||
// Not clustered, was not offloading the queue to other nodes
|
||||
return;
|
||||
}
|
||||
|
||||
if (offloaded) {
|
||||
// queue was offloaded previously, allow files to be added to the local partition
|
||||
offloaded = false;
|
||||
logger.debug("Queue {} on node {} was previously offloaded, resetting offloaded status to {}",
|
||||
this, clusterCoordinator.getLocalNodeIdentifier(), offloaded);
|
||||
// reset the partitioner based on the load balancing strategy, since offloading previously changed the partitioner
|
||||
FlowFilePartitioner partitioner = getPartitionerForLoadBalancingStrategy(getLoadBalanceStrategy(), getPartitioningAttribute());
|
||||
setFlowFilePartitioner(partitioner);
|
||||
logger.debug("Queue {} is no longer offloaded, restored load balance strategy to {} and partitioning attribute to \"{}\"",
|
||||
this, getLoadBalanceStrategy(), getPartitioningAttribute());
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void startLoadBalancing() {
|
||||
logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this);
|
||||
|
||||
|
@ -884,8 +911,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
|
||||
@Override
|
||||
public boolean isPropagateBackpressureAcrossNodes() {
|
||||
// TODO: We will want to modify this when we have the ability to offload flowfiles from a node.
|
||||
return true;
|
||||
// If offloaded = false, the queue is not offloading; return true to honor backpressure
|
||||
// If offloaded = true, the queue is offloading or has finished offloading; return false to ignore backpressure
|
||||
return !offloaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1096,6 +1124,12 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
}
|
||||
|
||||
switch (newState) {
|
||||
case CONNECTED:
|
||||
if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
|
||||
// the node with this queue was connected to the cluster, make sure the queue is not offloaded
|
||||
resetOffloadedQueue();
|
||||
}
|
||||
break;
|
||||
case OFFLOADED:
|
||||
case OFFLOADING:
|
||||
case DISCONNECTED:
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller
|
||||
|
||||
import org.apache.nifi.authorization.Authorizer
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus
|
||||
import org.apache.nifi.cluster.coordination.node.OffloadCode
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier
|
||||
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener
|
||||
import org.apache.nifi.cluster.protocol.message.OffloadMessage
|
||||
import org.apache.nifi.components.state.Scope
|
||||
import org.apache.nifi.components.state.StateManager
|
||||
import org.apache.nifi.components.state.StateManagerProvider
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus
|
||||
import org.apache.nifi.encrypt.StringEncryptor
|
||||
import org.apache.nifi.groups.ProcessGroup
|
||||
import org.apache.nifi.groups.RemoteProcessGroup
|
||||
import org.apache.nifi.state.MockStateMap
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.apache.nifi.web.revision.RevisionManager
|
||||
import spock.lang.Specification
|
||||
import spock.util.concurrent.BlockingVariable
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class StandardFlowServiceSpec extends Specification {
|
||||
def "handle an OffloadMessage"() {
|
||||
given: 'a node to offload'
|
||||
def nodeToOffload = createNodeIdentifier 1
|
||||
|
||||
and: 'a simple flow with one root group and a single processor'
|
||||
def stateManager = Mock StateManager
|
||||
def stateMap = new MockStateMap([:], 1)
|
||||
stateManager.getState(_ as Scope) >> stateMap
|
||||
def stateManagerProvider = Mock StateManagerProvider
|
||||
stateManagerProvider.getStateManager(_ as String) >> stateManager
|
||||
|
||||
def rootGroup = Mock ProcessGroup
|
||||
def flowController = Mock FlowController
|
||||
flowController.getStateManagerProvider() >> stateManagerProvider
|
||||
_ * flowController.rootGroup >> rootGroup
|
||||
|
||||
def clusterCoordinator = Mock ClusterCoordinator
|
||||
|
||||
def processGroupStatus = Mock ProcessGroupStatus
|
||||
def processorNode = Mock ProcessorNode
|
||||
def remoteProcessGroup = Mock RemoteProcessGroup
|
||||
def flowFileQueue = Mock FlowFileQueue
|
||||
|
||||
and: 'a flow service to handle the OffloadMessage'
|
||||
def flowService = StandardFlowService.createClusteredInstance(flowController, NiFiProperties.createBasicNiFiProperties('src/test/resources/conf/nifi.properties',
|
||||
[(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT): nodeToOffload.socketPort as String,
|
||||
(NiFiProperties.WEB_HTTP_PORT) : nodeToOffload.apiPort as String,
|
||||
(NiFiProperties.LOAD_BALANCE_PORT) : nodeToOffload.loadBalancePort as String]),
|
||||
Mock(NodeProtocolSenderListener), clusterCoordinator, Mock(StringEncryptor), Mock(RevisionManager), Mock(Authorizer))
|
||||
|
||||
def waitForFinishOffload = new BlockingVariable(5, TimeUnit.SECONDS)//new CountDownLatch(1)
|
||||
|
||||
when: 'the flow services receives an OffloadMessage'
|
||||
flowService.handle(new OffloadMessage(nodeId: nodeToOffload, explanation: 'unit test offload'), [] as Set)
|
||||
waitForFinishOffload.get()
|
||||
|
||||
then: 'no exceptions are thrown'
|
||||
noExceptionThrown()
|
||||
|
||||
and: 'the connection status for the node in the flow controller is set to OFFLOADING'
|
||||
1 * flowController.setConnectionStatus({ NodeConnectionStatus status ->
|
||||
status.nodeIdentifier.logicallyEquals(nodeToOffload) && status.state == NodeConnectionState.OFFLOADING && status.offloadCode == OffloadCode.OFFLOADED
|
||||
} as NodeConnectionStatus)
|
||||
|
||||
then: 'all processors are requested to stop'
|
||||
1 * flowController.stopAllProcessors()
|
||||
|
||||
then: 'all processors are requested to terminate'
|
||||
1 * processorNode.scheduledState >> ScheduledState.STOPPED
|
||||
1 * processorNode.processGroup >> rootGroup
|
||||
1 * rootGroup.terminateProcessor({ ProcessorNode pn -> pn == processorNode } as ProcessorNode)
|
||||
1 * rootGroup.findAllProcessors() >> [processorNode]
|
||||
|
||||
then: 'all remote process groups are requested to terminate'
|
||||
1 * remoteProcessGroup.stopTransmitting()
|
||||
1 * rootGroup.findAllRemoteProcessGroups() >> [remoteProcessGroup]
|
||||
|
||||
then: 'all queues are requested to offload'
|
||||
1 * flowFileQueue.offloadQueue()
|
||||
1 * flowController.getAllQueues() >> [flowFileQueue]
|
||||
|
||||
then: 'the queued count in the flow controller status is 0 to allow the offloading code to to complete'
|
||||
1 * flowController.getControllerStatus() >> processGroupStatus
|
||||
1 * processGroupStatus.getQueuedCount() >> 0
|
||||
|
||||
then: 'all queues are requested to reset to the original partitioner for the load balancing strategy'
|
||||
1 * flowFileQueue.resetOffloadedQueue()
|
||||
1 * flowController.getAllQueues() >> [flowFileQueue]
|
||||
|
||||
then: 'the connection status for the node in the flow controller is set to OFFLOADED'
|
||||
1 * flowController.setConnectionStatus({ NodeConnectionStatus status ->
|
||||
status.nodeIdentifier.logicallyEquals(nodeToOffload) && status.state == NodeConnectionState.OFFLOADED && status.offloadCode == OffloadCode.OFFLOADED
|
||||
} as NodeConnectionStatus)
|
||||
|
||||
then: 'the cluster coordinator is requested to finish the node offload'
|
||||
1 * clusterCoordinator.finishNodeOffload({ NodeIdentifier nodeIdentifier ->
|
||||
nodeIdentifier.logicallyEquals(nodeToOffload)
|
||||
} as NodeIdentifier) >> { waitForFinishOffload.set(it) }
|
||||
}
|
||||
|
||||
private static NodeIdentifier createNodeIdentifier(final int index) {
|
||||
new NodeIdentifier("node-id-$index", "localhost", 8000 + index, "localhost", 9000 + index,
|
||||
"localhost", 10000 + index, 11000 + index, false)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller.queue.clustered.partition
|
||||
|
||||
|
||||
import org.apache.nifi.controller.repository.FlowFileRecord
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Unroll
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class NonLocalPartitionPartitionerSpec extends Specification {
|
||||
|
||||
def "getPartition chooses local partition with 1 partition and throws IllegalStateException"() {
|
||||
given: "a local partitioner using a local partition"
|
||||
def partitioner = new NonLocalPartitionPartitioner()
|
||||
def localPartition = Mock QueuePartition
|
||||
def partitions = [localPartition] as QueuePartition[]
|
||||
def flowFileRecord = Mock FlowFileRecord
|
||||
|
||||
when: "a partition is requested from the partitioner"
|
||||
partitioner.getPartition flowFileRecord, partitions, localPartition
|
||||
|
||||
then: "an IllegalStateExceptions thrown"
|
||||
thrown(IllegalStateException)
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "getPartition chooses non-local partition with #maxPartitions partitions, #threads threads, #iterations iterations"() {
|
||||
given: "a local partitioner"
|
||||
def partitioner = new NonLocalPartitionPartitioner()
|
||||
def partitions = new QueuePartition[maxPartitions]
|
||||
|
||||
and: "a local partition"
|
||||
def localPartition = Mock QueuePartition
|
||||
partitions[0] = localPartition
|
||||
|
||||
and: "one or more multiple partitions"
|
||||
for (int id = 1; id < maxPartitions; ++id) {
|
||||
def partition = Mock QueuePartition
|
||||
partitions[id] = partition
|
||||
}
|
||||
|
||||
and: "an array to hold the resulting chosen partitions and an executor service with one or more threads"
|
||||
def flowFileRecord = Mock FlowFileRecord
|
||||
def chosenPartitions = [] as ConcurrentLinkedQueue
|
||||
def executorService = Executors.newFixedThreadPool threads
|
||||
|
||||
when: "a partition is requested from the partitioner for a given flowfile record and the existing partitions"
|
||||
iterations.times {
|
||||
executorService.submit {
|
||||
chosenPartitions.add partitioner.getPartition(flowFileRecord, partitions, localPartition)
|
||||
}
|
||||
}
|
||||
executorService.shutdown()
|
||||
try {
|
||||
while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
executorService.shutdownNow()
|
||||
Thread.currentThread().interrupt()
|
||||
}
|
||||
|
||||
then: "no exceptions are thrown"
|
||||
noExceptionThrown()
|
||||
|
||||
and: "there is a chosen partition for each iteration"
|
||||
chosenPartitions.size() == iterations
|
||||
|
||||
and: "each chosen partition is a remote partition and is one of the existing partitions"
|
||||
def validChosenPartitions = chosenPartitions.findAll { it != localPartition && partitions.contains(it) }
|
||||
|
||||
and: "there is a valid chosen partition for each iteration"
|
||||
validChosenPartitions.size() == iterations
|
||||
|
||||
and: "there are no other mock interactions"
|
||||
0 * _
|
||||
|
||||
where:
|
||||
maxPartitions | threads | iterations
|
||||
2 | 1 | 1
|
||||
2 | 1 | 10
|
||||
2 | 1 | 100
|
||||
2 | 10 | 1000
|
||||
5 | 1 | 1
|
||||
5 | 1 | 10
|
||||
5 | 1 | 100
|
||||
5 | 10 | 1000
|
||||
}
|
||||
}
|
|
@ -113,6 +113,10 @@ public class TestWriteAheadFlowFileRepository {
|
|||
public void offloadQueue() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetOffloadedQueue() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActivelyLoadBalancing() {
|
||||
return false;
|
||||
|
|
|
@ -4707,7 +4707,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
|
||||
final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
|
||||
if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
|
||||
throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId + " because it is not disconnected, current state = " + nodeConnectionStatus.getState());
|
||||
throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId +
|
||||
" because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState());
|
||||
}
|
||||
|
||||
clusterCoordinator.removeNode(nodeIdentifier, userDn);
|
||||
|
|
|
@ -362,9 +362,9 @@
|
|||
"integrity": "sha512-ipiDYhdQSCZ4hSbX4rMW+XzNKMD1prg/sTvoVmSLkuQ1MVlwjJQQA+sW8tMYR3BLUr9KjodFV4pvzunvRhd33Q=="
|
||||
},
|
||||
"font-awesome": {
|
||||
"version": "4.6.1",
|
||||
"resolved": "https://registry.npmjs.org/font-awesome/-/font-awesome-4.6.1.tgz",
|
||||
"integrity": "sha1-VHJl+0xFu+2Qq4vE93qXs3uFKhI="
|
||||
"version": "4.7.0",
|
||||
"resolved": "https://registry.npmjs.org/font-awesome/-/font-awesome-4.7.0.tgz",
|
||||
"integrity": "sha1-j6jPBBGhoxr9B7BtKQK7n8gVoTM="
|
||||
},
|
||||
"has-color": {
|
||||
"version": "0.1.7",
|
||||
|
|
|
@ -8,4 +8,4 @@ SIL OFL 1.1
|
|||
******************
|
||||
|
||||
The following binary components are provided under the SIL Open Font License 1.1
|
||||
(SIL OFL 1.1) FontAwesome (4.6.1 - http://fortawesome.github.io/Font-Awesome/license/)
|
||||
(SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free)
|
||||
|
|
|
@ -630,34 +630,25 @@
|
|||
// only allow the admin to modify the cluster
|
||||
if (nfCommon.canModifyController()) {
|
||||
var actionFormatter = function (row, cell, value, columnDef, dataContext) {
|
||||
var canDisconnect = false;
|
||||
var canConnect = false;
|
||||
var isOffloaded = false;
|
||||
var connectDiv = '<div title="Connect" class="pointer prompt-for-connect fa fa-plug"></div>';
|
||||
var deleteDiv = '<div title="Delete" class="pointer prompt-for-removal fa fa-trash"></div>';
|
||||
var disconnectDiv = '<div title="Disconnect" class="pointer prompt-for-disconnect fa fa-power-off"></div>';
|
||||
var offloadDiv = '<div title="Offload" class="pointer prompt-for-offload fa fa-rotate-90 fa-upload" ' +
|
||||
'style="margin-top: 5px;margin-left: 5px;margin-right: -2px;"></div>';
|
||||
var markup = '';
|
||||
|
||||
// determine the current status
|
||||
// determine the current status and create the appropriate markup
|
||||
if (dataContext.status === 'CONNECTED' || dataContext.status === 'CONNECTING') {
|
||||
canDisconnect = true;
|
||||
}
|
||||
if (dataContext.status === 'DISCONNECTED') {
|
||||
canConnect = true;
|
||||
}
|
||||
if (dataContext.status === 'OFFLOADED') {
|
||||
isOffloaded = true;
|
||||
markup += disconnectDiv;
|
||||
} else if (dataContext.status === 'DISCONNECTED') {
|
||||
markup += connectDiv + offloadDiv + deleteDiv;
|
||||
} else if (dataContext.status === 'OFFLOADED') {
|
||||
markup += connectDiv + deleteDiv;
|
||||
} else {
|
||||
markup += '<div style="width: 16px; height: 16px;"> </div>';
|
||||
}
|
||||
|
||||
// return the appropriate markup
|
||||
if (canConnect) {
|
||||
return '<div title="Connect" class="pointer prompt-for-connect fa fa-plug"></div>' +
|
||||
'<div title="Delete" class="pointer prompt-for-removal fa fa-trash"></div>' +
|
||||
'<div title="Offload" class="pointer prompt-for-offload fa fa-rotate-90 fa-upload"></div>';
|
||||
} else if (canDisconnect) {
|
||||
return '<div title="Disconnect" class="pointer prompt-for-disconnect fa fa-power-off"></div>';
|
||||
} else if (isOffloaded) {
|
||||
return '<div title="Connect" class="pointer prompt-for-connect fa fa-plug"></div>' +
|
||||
'<div title="Delete" class="pointer prompt-for-removal fa fa-trash"></div>';
|
||||
} else {
|
||||
return '<div style="width: 16px; height: 16px;"> </div>';
|
||||
}
|
||||
return markup;
|
||||
};
|
||||
|
||||
columnModel.push({
|
||||
|
|
|
@ -57,7 +57,7 @@ public class DisconnectNode extends AbstractNiFiCommand<NodeResult> {
|
|||
|
||||
NodeDTO nodeDto = new NodeDTO();
|
||||
nodeDto.setNodeId(nodeId);
|
||||
// TODO There's no constant for node status in
|
||||
// TODO There are no constants for the DISCONNECT node status
|
||||
nodeDto.setStatus("DISCONNECTING");
|
||||
NodeEntity nodeEntity = new NodeEntity();
|
||||
nodeEntity.setNode(nodeDto);
|
||||
|
|
3
pom.xml
3
pom.xml
|
@ -342,6 +342,9 @@
|
|||
<include>**/Test*.class</include>
|
||||
<include>**/*Spec.class</include>
|
||||
</includes>
|
||||
<excludes>
|
||||
<exclude>**/*ITSpec.class</exclude>
|
||||
</excludes>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
<argLine combine.children="append">-Xmx1G
|
||||
-Djava.net.preferIPv4Stack=true
|
||||
|
|
Loading…
Reference in New Issue