NIFI-9433: When a Connection is unregistered from the NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction if the transaction belongs to the appropriate connection. Also ensure that when we do cancel a transaction / call its failure callback, we purge the collection of any FlowFiles that have been sent in that transaction. This ensures that we cannot later attempt to failure the transaction again, decrementing the count of FlowFiles for the connection more than we should.

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5564.
This commit is contained in:
Mark Payne 2021-12-02 11:21:36 -05:00 committed by Joe Gresock
parent 90b39b593a
commit f95044ff87
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
3 changed files with 13 additions and 11 deletions

View File

@ -43,7 +43,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
@ -119,8 +118,10 @@ public class LoadBalanceSession {
return phase.getRequiredSelectionKey();
}
public synchronized List<FlowFileRecord> getFlowFilesSent() {
return Collections.unmodifiableList(flowFilesSent);
public synchronized List<FlowFileRecord> getAndPurgeFlowFilesSent() {
final List<FlowFileRecord> copy = new ArrayList<>(flowFilesSent);
flowFilesSent.clear();
return copy;
}
public synchronized boolean isComplete() {

View File

@ -129,11 +129,12 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
}
logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", this, connectionId, removedPartition);
if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
final boolean validSession = loadBalanceSession != null && connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
if (validSession && !loadBalanceSession.isComplete()) {
// Attempt to cancel the session. If successful, trigger the failure callback for the partition.
// If not successful, it indicates that another thread has completed the session and is responsible or the transaction success/failure
if (loadBalanceSession.cancel()) {
final List<FlowFileRecord> flowFilesSent = loadBalanceSession.getFlowFilesSent();
final List<FlowFileRecord> flowFilesSent = loadBalanceSession.getAndPurgeFlowFilesSent();
logger.debug("{} Triggering failure callback for {} FlowFiles for Registered Partition {} because partition was unregistered", this, flowFilesSent.size(), removedPartition);
removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
@ -268,7 +269,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
loadBalanceSession.getPartition().getConnectionId() + " due to " + e);
penalize();
loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getAndPurgeFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
close();
return false;
@ -278,7 +279,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
} while (success);
if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) {
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(), nodeIdentifier);
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), nodeIdentifier);
}
return anySuccess;
@ -311,10 +312,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
loadBalanceSession = null;
logger.debug("Node {} disconnected so will terminate the Load Balancing Session", nodeIdentifier);
final List<FlowFileRecord> flowFilesSent = session.getFlowFilesSent();
final List<FlowFileRecord> flowFilesSent = session.getAndPurgeFlowFilesSent();
if (!flowFilesSent.isEmpty()) {
session.getPartition().getFailureCallback().onTransactionFailed(session.getFlowFilesSent(), TransactionFailureCallback.TransactionPhase.SENDING);
session.getPartition().getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
}
close();

View File

@ -190,7 +190,7 @@ public class TestLoadBalanceSession {
assertArrayEquals(expectedSent, dataSent);
assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getFlowFilesSent());
assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getAndPurgeFlowFilesSent());
}
@ -271,6 +271,6 @@ public class TestLoadBalanceSession {
assertArrayEquals(expectedSent, dataSent);
assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
assertEquals(Arrays.asList(flowFile1), transaction.getAndPurgeFlowFilesSent());
}
}