NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue creation and NodeClusterCoordinator learning the Local Node Identifier. Additionally, updated NodeClusterCoordinator so that it will persist its state when it does learn the Local Node Identifier, as that was not being persisted. In testing this, also encounterd a deadlock in a particular Integration Test that was run, around AbstractFlowFileQueue and its synchronization. Because the 'synchronized' keyword synchronizes on 'this' and the concrete implementation also uses 'synchronized' and the result is the same 'this', a deadlock can occur that is difficult to understand. As a result, refactored AbstractFlowFileQueue to instead use read/write locks.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2020-01-22 12:05:03 -05:00 committed by Joe Witt
parent 379c3fbe51
commit be34767c8a
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 156 additions and 35 deletions

View File

@ -259,6 +259,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.nodeId = nodeId;
nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
storeState();
}
@Override

View File

@ -44,6 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
@ -62,6 +65,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
private String partitioningAttribute = null;
private final ReadWriteLock loadBalanceRWLock = new ReentrantReadWriteLock();
private final Lock loadBalanceReadLock = loadBalanceRWLock.readLock();
private final Lock loadBalanceWriteLock = loadBalanceRWLock.writeLock();
private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
@ -423,32 +430,57 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
}
@Override
public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
loadBalanceWriteLock.lock();
try {
if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
}
this.loadBalanceStrategy = strategy;
this.partitioningAttribute = partitioningAttribute;
} finally {
loadBalanceWriteLock.unlock();
}
this.loadBalanceStrategy = strategy;
this.partitioningAttribute = partitioningAttribute;
}
@Override
public synchronized String getPartitioningAttribute() {
return partitioningAttribute;
public String getPartitioningAttribute() {
loadBalanceReadLock.lock();
try {
return partitioningAttribute;
} finally {
loadBalanceReadLock.unlock();
}
}
@Override
public synchronized LoadBalanceStrategy getLoadBalanceStrategy() {
return loadBalanceStrategy;
public LoadBalanceStrategy getLoadBalanceStrategy() {
loadBalanceReadLock.lock();
try {
return loadBalanceStrategy;
} finally {
loadBalanceReadLock.unlock();
}
}
@Override
public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
this.compression = compression;
loadBalanceWriteLock.lock();
try {
this.compression = compression;
} finally {
loadBalanceWriteLock.unlock();
}
}
@Override
public synchronized LoadBalanceCompression getLoadBalanceCompression() {
return compression;
loadBalanceReadLock.lock();
try {
return compression;
} finally {
loadBalanceReadLock.unlock();
}
}
}

View File

@ -17,15 +17,15 @@
package org.apache.nifi.controller.queue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable {
private static final long serialVersionUID = 1L;
private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
@ -68,6 +68,7 @@ public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializabl
final ContentClaim claim1 = f1.getContentClaim();
final ContentClaim claim2 = f2.getContentClaim();
// put the one without a claim first
if (claim1 == null && claim2 != null) {
return -1;

View File

@ -145,19 +145,25 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
if (sortedNodeIdentifiers.isEmpty()) {
// No Node Identifiers are known yet. Just create the partitions using the local partition.
queuePartitions = new QueuePartition[] { localPartition };
} else {
queuePartitions = new QueuePartition[sortedNodeIdentifiers.size()];
// The node identifiers are known. Create the partitions using the local partition and 1 Remote Partition for each node
// that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known,
// the queuePartitions array will be recreated with the appropriate partitions.
final List<QueuePartition> partitionList = new ArrayList<>();
partitionList.add(localPartition);
for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
queuePartitions[i] = localPartition;
} else {
queuePartitions[i] = createRemotePartition(nodeId);
final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier();
for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
if (nodeId.equals(localNodeId)) {
continue;
}
partitionList.add(createRemotePartition(nodeId));
}
queuePartitions = partitionList.toArray(new QueuePartition[0]);
}
partitioner = new LocalPartitionPartitioner();

View File

@ -37,6 +37,8 @@ import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.junit.Assert;
import org.junit.Before;
@ -136,6 +138,78 @@ public class TestSocketLoadBalancedFlowFileQueue {
"localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
}
@Test
public void testPriorities() {
final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
final int i1 = Integer.parseInt(o1.getAttribute("i"));
final int i2 = Integer.parseInt(o2.getAttribute("i"));
return Integer.compare(i1, i2);
}
};
queue.setPriorities(Collections.singletonList(iValuePrioritizer));
final Map<String, String> attributes = new HashMap<>();
// Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
for (int i = 99; i >= 0; i--) {
attributes.put("i", String.valueOf(i));
final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
queue.put(flowFile);
}
for (int i=0; i < 100; i++) {
final FlowFileRecord polled = queue.poll(Collections.emptySet());
assertNotNull(polled);
assertEquals(String.valueOf(i), polled.getAttribute("i"));
}
assertNull(queue.poll(Collections.emptySet()));
}
@Test
public void testPrioritiesWhenSetBeforeLocalNodeIdDetermined() {
final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
final int i1 = Integer.parseInt(o1.getAttribute("i"));
final int i2 = Integer.parseInt(o2.getAttribute("i"));
return Integer.compare(i1, i2);
}
};
final ProcessScheduler scheduler = mock(ProcessScheduler.class);
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
queue.setPriorities(Collections.singletonList(iValuePrioritizer));
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
queue.setNodeIdentifiers(new HashSet<>(nodeIds), true);
final Map<String, String> attributes = new HashMap<>();
// Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
for (int i = 99; i >= 0; i--) {
attributes.put("i", String.valueOf(i));
final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
queue.put(flowFile);
}
for (int i=0; i < 100; i++) {
final FlowFileRecord polled = queue.poll(Collections.emptySet());
assertNotNull(polled);
assertEquals(String.valueOf(i), polled.getAttribute("i"));
}
assertNull(queue.poll(Collections.emptySet()));
}
@Test
public void testBinsAccordingToPartitioner() {
final FlowFilePartitioner partitioner = new StaticFlowFilePartitioner(1);
@ -395,8 +469,8 @@ public class TestSocketLoadBalancedFlowFileQueue {
assertPartitionSizes(expectedPartitionSizes);
}
@Test(timeout = 100000)
public void testLocalNodeIdentifierSet() throws InterruptedException {
@Test(timeout = 10000)
public void testDataInRemotePartitionForLocalIdIsMovedToLocalPartition() throws InterruptedException {
nodeIds.clear();
final NodeIdentifier id1 = createNodeIdentifier();
@ -410,7 +484,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo,
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
queue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -421,22 +495,28 @@ public class TestSocketLoadBalancedFlowFileQueue {
queue.put(new MockFlowFileRecord(attributes, 0));
}
for (int i=0; i < 3; i++) {
assertEquals(2, queue.getPartition(i).size().getObjectCount());
}
assertEquals(0, queue.getLocalPartition().size().getObjectCount());
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1);
clusterTopologyEventListener.onLocalNodeIdentifierSet(id1);
assertPartitionSizes(new int[] {2, 2, 2});
assertEquals(6, queue.size().getObjectCount());
while (queue.getLocalPartition().size().getObjectCount() != 2) {
Thread.sleep(10L);
// Ensure that the partitions' object sizes add up to 6. This could take a short time because rebalancing will occur.
// So we wait in a loop.
while (true) {
int totalObjectCount = 0;
for (int i = 0; i < queue.getPartitionCount(); i++) {
totalObjectCount += queue.getPartition(i).size().getObjectCount();
}
if (totalObjectCount == 6) {
break;
}
}
assertEquals(3, queue.getPartitionCount());
}
private void assertPartitionSizes(final int[] expectedSizes) {
final int[] partitionSizes = new int[queue.getPartitionCount()];
while (!Arrays.equals(expectedSizes, partitionSizes)) {