NIFI-7117: When SocketLoadBalancedFlowFileQueue creates its array of Queue Partitions in the constructor, it added the local partition as the first element in that list. This list should be ordered the same across all nodes in the cluster. By making the local partition the first in the array, each node had a different ordering of these partitions. As a result, Partition by Attribute strategy would constantly rebalance flowfiles that it received to other node, and Single Node always transferred data to the first partition, which was the local node, instead of whichever node should have been the first in the list. This commit addresses this issue by instead inserting the local partition intot he 'queuePartitions' array based on the local node identifier.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4045.
This commit is contained in:
Mark Payne 2020-02-10 14:15:46 -05:00 committed by Pierre Villard
parent 5e964fbc47
commit 65b2a9bc2c
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
6 changed files with 103 additions and 5 deletions

View File

@ -151,15 +151,19 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
// that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known, // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known,
// the queuePartitions array will be recreated with the appropriate partitions. // the queuePartitions array will be recreated with the appropriate partitions.
final List<QueuePartition> partitionList = new ArrayList<>(); final List<QueuePartition> partitionList = new ArrayList<>();
partitionList.add(localPartition);
final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier(); final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier();
for (final NodeIdentifier nodeId : sortedNodeIdentifiers) { for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
if (nodeId.equals(localNodeId)) { if (nodeId.equals(localNodeId)) {
continue; partitionList.add(localPartition);
} else {
partitionList.add(createRemotePartition(nodeId));
} }
}
partitionList.add(createRemotePartition(nodeId)); // Ensure that our list of queue partitions always contains the local partition.
if (!partitionList.contains(localPartition)) {
partitionList.add(localPartition);
} }
queuePartitions = partitionList.toArray(new QueuePartition[0]); queuePartitions = partitionList.toArray(new QueuePartition[0]);

View File

@ -19,8 +19,16 @@ package org.apache.nifi.controller.queue.clustered.partition;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class CorrelationAttributePartitioner implements FlowFilePartitioner { public class CorrelationAttributePartitioner implements FlowFilePartitioner {
private static final Logger logger = LoggerFactory.getLogger(CorrelationAttributePartitioner.class);
private final String partitioningAttribute; private final String partitioningAttribute;
public CorrelationAttributePartitioner(final String partitioningAttribute) { public CorrelationAttributePartitioner(final String partitioningAttribute) {
@ -41,6 +49,15 @@ public class CorrelationAttributePartitioner implements FlowFilePartitioner {
index = Hashing.consistentHash(hash, partitions.length); index = Hashing.consistentHash(hash, partitions.length);
} }
if (logger.isDebugEnabled()) {
final List<String> partitionDescriptions = new ArrayList<>(partitions.length);
for (final QueuePartition partition : partitions) {
partitionDescriptions.add(partition.getSwapPartitionName());
}
logger.debug("Assigning Partition {} to {} based on {}", index, flowFile.getAttribute(CoreAttributes.UUID.key()), partitionDescriptions);
}
return partitions[index]; return partitions[index];
} }

View File

@ -691,8 +691,11 @@ public class NiFiClientUtil {
final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex); final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
final String uuid = flowFileSummary.getUuid(); final String uuid = flowFileSummary.getUuid();
final String nodeId = flowFileSummary.getClusterNodeId();
return nifiClient.getConnectionClient().getFlowFile(connectionId, uuid); final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
return flowFileEntity;
} }
public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException { public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException {

View File

@ -31,6 +31,7 @@ import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ClusterEntity; import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -38,8 +39,10 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LongSummaryStatistics; import java.util.LongSummaryStatistics;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -157,6 +160,64 @@ public class LoadBalanceIT extends NiFiSystemIT {
assertEquals(numNodes - 1, emptyNodes); assertEquals(numNodes - 1, emptyNodes);
} }
@Test
public void testPartitionByAttribute() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity count = getClientUtil().createProcessor("CountEvents");
final ConnectionEntity connection = getClientUtil().createConnection(generate, count, "success");
getClientUtil().setAutoTerminatedRelationships(count, "success");
// Configure Processor to generate 10 FlowFiles, each 1 MB, on each node, for a total of 20 FlowFiles.
final Map<String, String> generateProperties = new HashMap<>();
generateProperties.put("File Size", "1 MB");
generateProperties.put("Batch Size", "10");
generateProperties.put("number", "0");
getClientUtil().updateProcessorProperties(generate, generateProperties);
getClientUtil().updateProcessorExecutionNode(generate, ExecutionNode.PRIMARY);
// Round Robin between nodes. This should result in 10 FlowFiles on each node.
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE, LoadBalanceCompression.DO_NOT_COMPRESS, "number");
// Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with number=2, etc. to up 10 with number=9
for (int i=1; i <= 10; i++) {
// Generate the data.
getNifiClient().getProcessorClient().startProcessor(generate);
final int expectedQueueSize = 10 * i;
// Wait until all 10 FlowFiles are queued up.
waitFor(() -> {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connection.getId());
return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == expectedQueueSize;
});
getNifiClient().getProcessorClient().stopProcessor(generate);
getClientUtil().waitForStoppedProcessor(generate.getId());
generateProperties.put("number", String.valueOf(i));
getClientUtil().updateProcessorProperties(generate, generateProperties);
}
// Wait until load balancing is complete
waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
final Map<String, Set<String>> nodesByAttribute = new HashMap<>();
for (int i=0; i < 100; i++) {
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), i);
final String numberValue = flowFile.getFlowFile().getAttributes().get("number");
final Set<String> nodes = nodesByAttribute.computeIfAbsent(numberValue, key -> new HashSet<>());
nodes.add(flowFile.getFlowFile().getClusterNodeId());
}
assertEquals(10, nodesByAttribute.size());
for (final Map.Entry<String, Set<String>> entry : nodesByAttribute.entrySet()) {
final Set<String> nodes = entry.getValue();
assertEquals("FlowFile with attribute number=" + entry.getKey() + " went to nodes " + nodes, 1, nodes.size());
}
}
@Test @Test
public void testOffload() throws NiFiClientException, IOException, InterruptedException { public void testOffload() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");

View File

@ -47,4 +47,6 @@ public interface ConnectionClient {
ListingRequestEntity deleteListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException; ListingRequestEntity deleteListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException;
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException; FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException;
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
} }

View File

@ -242,8 +242,15 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn
}); });
} }
@Override @Override
public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid) throws NiFiClientException, IOException { public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid) throws NiFiClientException, IOException {
return getFlowFile(connectionId, flowFileUuid, null);
}
@Override
public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid, final String nodeId) throws NiFiClientException, IOException {
if (connectionId == null) { if (connectionId == null) {
throw new IllegalArgumentException("Connection ID cannot be null"); throw new IllegalArgumentException("Connection ID cannot be null");
} }
@ -252,11 +259,15 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn
} }
return executeAction("Error retrieving FlowFile", () -> { return executeAction("Error retrieving FlowFile", () -> {
final WebTarget target = flowFileQueueTarget WebTarget target = flowFileQueueTarget
.path("flowfiles/{uuid}") .path("flowfiles/{uuid}")
.resolveTemplate("id", connectionId) .resolveTemplate("id", connectionId)
.resolveTemplate("uuid", flowFileUuid); .resolveTemplate("uuid", flowFileUuid);
if (nodeId != null) {
target = target.queryParam("clusterNodeId", nodeId);
}
return getRequestBuilder(target).get(FlowFileEntity.class); return getRequestBuilder(target).get(FlowFileEntity.class);
}); });
} }