From 65b2a9bc2c91657bc160fe93d6051da95ab5db34 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 10 Feb 2020 14:15:46 -0500 Subject: [PATCH] NIFI-7117: When SocketLoadBalancedFlowFileQueue creates its array of Queue Partitions in the constructor, it added the local partition as the first element in that list. This list should be ordered the same across all nodes in the cluster. By making the local partition the first in the array, each node had a different ordering of these partitions. As a result, Partition by Attribute strategy would constantly rebalance flowfiles that it received to other node, and Single Node always transferred data to the first partition, which was the local node, instead of whichever node should have been the first in the list. This commit addresses this issue by instead inserting the local partition intot he 'queuePartitions' array based on the local node identifier. Signed-off-by: Pierre Villard This closes #4045. --- .../SocketLoadBalancedFlowFileQueue.java | 10 ++- .../CorrelationAttributePartitioner.java | 17 ++++++ .../nifi/tests/system/NiFiClientUtil.java | 5 +- .../system/loadbalance/LoadBalanceIT.java | 61 +++++++++++++++++++ .../impl/client/nifi/ConnectionClient.java | 2 + .../nifi/impl/JerseyConnectionClient.java | 13 +++- 6 files changed, 103 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index e69daad5ce..06c86d1fbf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -151,15 +151,19 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known, // the queuePartitions array will be recreated with the appropriate partitions. final List partitionList = new ArrayList<>(); - partitionList.add(localPartition); final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier(); for (final NodeIdentifier nodeId : sortedNodeIdentifiers) { if (nodeId.equals(localNodeId)) { - continue; + partitionList.add(localPartition); + } else { + partitionList.add(createRemotePartition(nodeId)); } + } - partitionList.add(createRemotePartition(nodeId)); + // Ensure that our list of queue partitions always contains the local partition. + if (!partitionList.contains(localPartition)) { + partitionList.add(localPartition); } queuePartitions = partitionList.toArray(new QueuePartition[0]); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java index 752909899b..70172cabec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java @@ -19,8 +19,16 @@ package org.apache.nifi.controller.queue.clustered.partition; import com.google.common.hash.Hashing; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; public class CorrelationAttributePartitioner implements FlowFilePartitioner { + private static final Logger logger = LoggerFactory.getLogger(CorrelationAttributePartitioner.class); + private final String partitioningAttribute; public CorrelationAttributePartitioner(final String partitioningAttribute) { @@ -41,6 +49,15 @@ public class CorrelationAttributePartitioner implements FlowFilePartitioner { index = Hashing.consistentHash(hash, partitions.length); } + if (logger.isDebugEnabled()) { + final List partitionDescriptions = new ArrayList<>(partitions.length); + for (final QueuePartition partition : partitions) { + partitionDescriptions.add(partition.getSwapPartitionName()); + } + + logger.debug("Assigning Partition {} to {} based on {}", index, flowFile.getAttribute(CoreAttributes.UUID.key()), partitionDescriptions); + } + return partitions[index]; } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 03e14c6d58..cd268b0e79 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -691,8 +691,11 @@ public class NiFiClientUtil { final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex); final String uuid = flowFileSummary.getUuid(); + final String nodeId = flowFileSummary.getClusterNodeId(); - return nifiClient.getConnectionClient().getFlowFile(connectionId, uuid); + final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId); + flowFileEntity.getFlowFile().setClusterNodeId(nodeId); + return flowFileEntity; } public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map variables) throws NiFiClientException, IOException { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java index 2a8b698ee1..a0e93aa07b 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java @@ -31,6 +31,7 @@ import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.entity.ClusterEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.junit.Assert; import org.junit.Test; @@ -38,8 +39,10 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LongSummaryStatistics; import java.util.Map; +import java.util.Set; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; @@ -157,6 +160,64 @@ public class LoadBalanceIT extends NiFiSystemIT { assertEquals(numNodes - 1, emptyNodes); } + @Test + public void testPartitionByAttribute() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity count = getClientUtil().createProcessor("CountEvents"); + + final ConnectionEntity connection = getClientUtil().createConnection(generate, count, "success"); + getClientUtil().setAutoTerminatedRelationships(count, "success"); + + // Configure Processor to generate 10 FlowFiles, each 1 MB, on each node, for a total of 20 FlowFiles. + final Map generateProperties = new HashMap<>(); + generateProperties.put("File Size", "1 MB"); + generateProperties.put("Batch Size", "10"); + generateProperties.put("number", "0"); + getClientUtil().updateProcessorProperties(generate, generateProperties); + getClientUtil().updateProcessorExecutionNode(generate, ExecutionNode.PRIMARY); + + // Round Robin between nodes. This should result in 10 FlowFiles on each node. + getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE, LoadBalanceCompression.DO_NOT_COMPRESS, "number"); + + // Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with number=2, etc. to up 10 with number=9 + for (int i=1; i <= 10; i++) { + // Generate the data. + getNifiClient().getProcessorClient().startProcessor(generate); + + final int expectedQueueSize = 10 * i; + + // Wait until all 10 FlowFiles are queued up. + waitFor(() -> { + final ConnectionStatusEntity statusEntity = getConnectionStatus(connection.getId()); + return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == expectedQueueSize; + }); + + + getNifiClient().getProcessorClient().stopProcessor(generate); + getClientUtil().waitForStoppedProcessor(generate.getId()); + + generateProperties.put("number", String.valueOf(i)); + getClientUtil().updateProcessorProperties(generate, generateProperties); + } + + // Wait until load balancing is complete + waitFor(() -> isConnectionDoneLoadBalancing(connection.getId())); + + final Map> nodesByAttribute = new HashMap<>(); + for (int i=0; i < 100; i++) { + final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), i); + final String numberValue = flowFile.getFlowFile().getAttributes().get("number"); + final Set nodes = nodesByAttribute.computeIfAbsent(numberValue, key -> new HashSet<>()); + nodes.add(flowFile.getFlowFile().getClusterNodeId()); + } + + assertEquals(10, nodesByAttribute.size()); + for (final Map.Entry> entry : nodesByAttribute.entrySet()) { + final Set nodes = entry.getValue(); + assertEquals("FlowFile with attribute number=" + entry.getKey() + " went to nodes " + nodes, 1, nodes.size()); + } + } + @Test public void testOffload() throws NiFiClientException, IOException, InterruptedException { final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java index a486a65825..f5c07d16cc 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java @@ -47,4 +47,6 @@ public interface ConnectionClient { ListingRequestEntity deleteListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException; FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException; + + FlowFileEntity getFlowFile(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java index f4180fee60..ffe038e0fb 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java @@ -242,8 +242,15 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn }); } + @Override public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid) throws NiFiClientException, IOException { + return getFlowFile(connectionId, flowFileUuid, null); + } + + + @Override + public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid, final String nodeId) throws NiFiClientException, IOException { if (connectionId == null) { throw new IllegalArgumentException("Connection ID cannot be null"); } @@ -252,11 +259,15 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn } return executeAction("Error retrieving FlowFile", () -> { - final WebTarget target = flowFileQueueTarget + WebTarget target = flowFileQueueTarget .path("flowfiles/{uuid}") .resolveTemplate("id", connectionId) .resolveTemplate("uuid", flowFileUuid); + if (nodeId != null) { + target = target.queryParam("clusterNodeId", nodeId); + } + return getRequestBuilder(target).get(FlowFileEntity.class); }); }