NIFI-6353: If LoadBalanceSession determines remote node's queue is full, mark session complete and penalize the Partition, rather than just penalizing the session. This allows the client to move on to another session, which avoids an issue where backpressure on one queue can prevent data from being transferred between nodes in another queue

This closes #3529.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-06-10 16:06:28 -04:00 committed by Bryan Bende
parent 2465f67dfe
commit 8bed249f37
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 12 additions and 2 deletions

View File

@ -578,7 +578,8 @@ public class LoadBalanceSession {
logger.debug("Peer {} has confirmed that the queue is full for Connection {}", peerDescription, connectionId);
phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
checksum.reset(); // We are restarting the session entirely so we need to reset our checksum
penalize();
complete = true; // consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session.
partition.penalize(1000L);
} else {
throw new TransactionAbortedException("After requesting to know whether or not Peer " + peerDescription + " has space available in Connection " + connectionId
+ ", received unexpected response of " + response + ". Aborting transaction.");

View File

@ -347,7 +347,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
try {
RegisteredPartition partition;
while ((partition = partitionQueue.poll()) != null) {
if (partition.isEmpty() || !filter.test(partition)) {
if (partition.isEmpty() || partition.isPenalized() || !filter.test(partition)) {
polledPartitions.add(partition);
continue;
}

View File

@ -33,6 +33,7 @@ public class RegisteredPartition {
private final TransactionCompleteCallback successCallback;
private final Supplier<LoadBalanceCompression> compressionSupplier;
private final BooleanSupplier honorBackpressureSupplier;
private volatile long penaltyExpiration;
public RegisteredPartition(final String connectionId, final BooleanSupplier emptySupplier, final Supplier<FlowFileRecord> flowFileSupplier, final TransactionFailureCallback failureCallback,
final TransactionCompleteCallback successCallback, final Supplier<LoadBalanceCompression> compressionSupplier, final BooleanSupplier honorBackpressureSupplier) {
@ -72,4 +73,12 @@ public class RegisteredPartition {
public boolean isHonorBackpressure() {
return honorBackpressureSupplier.getAsBoolean();
}
public void penalize(final long millis) {
this.penaltyExpiration = System.currentTimeMillis() + millis;
}
public boolean isPenalized() {
return penaltyExpiration > System.currentTimeMillis();
}
}