NIFI-9835: Fixed threading bug in which NioAsyncLoadBalanceClient calls LoadBalanceSession.isComplete() followed by LoadBalanceSession.isCanceled() but it's possible for the complete flag to change before the canceled flag (they are not updated atomically). So changed to use a single LoadBalanceSessionState enum that represents the state. Also made the private StandardProcessSession.commit(boolean) method synchronized. When a processor is terminated (as is the case in Offload), we roll back sessions and both the commit() and rollback() need to be synchronized. Only the public commit() method was synchronized, and now with commitAsync() happening, we had the ability to commit without any synchronization. This addresses that concern. Also fixed a typo in docs for MergeRecord.

This closes #5902

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-03-25 11:43:03 -04:00 committed by exceptionfactory
parent 2ac90a6441
commit 63f02c99bf
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 42 additions and 45 deletions

View File

@ -543,7 +543,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
LOG.debug("Successfully committed session {} for {}", this, connectableDescription);
}
private void commit(final boolean asynchronous) {
private synchronized void commit(final boolean asynchronous) {
checkpoint(this.checkpoint != null); // If a checkpoint already exists, we need to copy the collection
commit(this.checkpoint, asynchronous);

View File

@ -45,7 +45,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@ -64,7 +63,6 @@ import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalancePro
public class LoadBalanceSession {
private static final Logger logger = LoggerFactory.getLogger(LoadBalanceSession.class);
static final int MAX_DATA_FRAME_SIZE = 65535;
private static final long PENALTY_MILLIS = TimeUnit.SECONDS.toMillis(2L);
private final RegisteredPartition partition;
private final Supplier<FlowFileRecord> flowFileSupplier;
@ -75,7 +73,6 @@ public class LoadBalanceSession {
private final String peerDescription;
private final String connectionId;
private final TransactionThreshold transactionThreshold;
private volatile boolean canceled = false;
final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
private int protocolVersion = 1;
@ -85,13 +82,12 @@ public class LoadBalanceSession {
// guarded by synchronizing on 'this'
private ByteBuffer preparedFrame;
private FlowFileRecord currentFlowFile;
private List<FlowFileRecord> flowFilesSent = new ArrayList<>();
private final List<FlowFileRecord> flowFilesSent = new ArrayList<>();
private TransactionPhase phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
private InputStream flowFileInputStream;
private byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE];
private boolean complete = false;
private final byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE];
private long readTimeout;
private long penaltyExpiration = -1L;
private volatile LoadBalanceSessionState sessionState = LoadBalanceSessionState.ACTIVE;
public LoadBalanceSession(final RegisteredPartition partition, final FlowFileContentAccess contentAccess, final LoadBalanceFlowFileCodec flowFileCodec, final PeerChannel peerChannel,
final int timeoutMillis, final TransactionThreshold transactionThreshold) {
@ -124,17 +120,12 @@ public class LoadBalanceSession {
return copy;
}
public synchronized boolean isComplete() {
return complete;
public synchronized LoadBalanceSessionState getSessionState() {
return sessionState;
}
public synchronized boolean communicate() throws IOException {
if (isComplete()) {
return false;
}
if (isPenalized()) {
logger.debug("Will not communicate with Peer {} for Connection {} because session is penalized", peerDescription, connectionId);
if (sessionState.isComplete()) {
return false;
}
@ -167,25 +158,20 @@ public class LoadBalanceSession {
final int bytesWritten = channel.write(preparedFrame);
return bytesWritten > 0;
} catch (final Exception e) {
complete = true;
sessionState = LoadBalanceSessionState.COMPLETED_EXCEPTIONALLY;
throw e;
}
}
public synchronized boolean cancel() {
if (complete) {
if (sessionState.isComplete()) {
return false;
}
complete = true;
canceled = true;
sessionState = LoadBalanceSessionState.CANCELED;
return true;
}
public boolean isCanceled() {
return canceled;
}
private boolean confirmTransactionComplete() throws IOException {
logger.debug("Confirming Transaction Complete for Peer {}", peerDescription);
@ -210,7 +196,7 @@ public class LoadBalanceSession {
throw new IOException("Expected a CONFIRM_COMPLETE_TRANSACTION response from Peer " + peerDescription + " but received a value of " + response);
}
complete = true;
sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY;
logger.debug("Successfully completed Transaction to send {} FlowFiles to Peer {} for Connection {}", flowFilesSent.size(), peerDescription, connectionId);
return true;
@ -496,21 +482,19 @@ public class LoadBalanceSession {
protocolVersion = requestedVersion;
phase = TransactionPhase.SEND_CONNECTION_ID;
logger.debug("Peer {} recommended Protocol Version of {}. Accepting version.", peerDescription, requestedVersion);
return true;
} else {
final Integer preferred = negotiator.getPreferredVersion(requestedVersion);
if (preferred == null) {
logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", peerDescription, requestedVersion);
phase = TransactionPhase.ABORT_PROTOCOL_NEGOTIATION;
return true;
} else {
logger.debug("Peer {} requested version {} of the Protocol. Recommending version {} instead", peerDescription, requestedVersion, preferred);
protocolVersion = preferred;
phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
return true;
}
}
return true;
}
private ByteBuffer noMoreFlowFiles() {
@ -592,7 +576,9 @@ public class LoadBalanceSession {
logger.debug("Peer {} has confirmed that the queue is full for Connection {}", peerDescription, connectionId);
phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
checksum.reset(); // We are restarting the session entirely so we need to reset our checksum
complete = true; // consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session.
// consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session.
sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY;
partition.penalize(1000L);
} else {
throw new TransactionAbortedException("After requesting to know whether or not Peer " + peerDescription + " has space available in Connection " + connectionId
@ -602,15 +588,6 @@ public class LoadBalanceSession {
return true;
}
private void penalize() {
penaltyExpiration = System.currentTimeMillis() + PENALTY_MILLIS;
}
private boolean isPenalized() {
// check for penaltyExpiration > -1L is not strictly necessary as it's implied by the second check but is still
// here because it's more efficient to check this than to make the system call to System.currentTimeMillis().
return penaltyExpiration > -1L && System.currentTimeMillis() < penaltyExpiration;
}
private enum TransactionPhase {
@ -653,4 +630,23 @@ public class LoadBalanceSession {
return requiredSelectionKey;
}
}
public enum LoadBalanceSessionState {
ACTIVE(false),
COMPLETED_SUCCESSFULLY(true),
COMPLETED_EXCEPTIONALLY(true),
CANCELED(true);
private final boolean complete;
LoadBalanceSessionState(final boolean complete) {
this.complete = complete;
}
public boolean isComplete() {
return complete;
}
}
}

View File

@ -130,7 +130,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", this, connectionId, removedPartition);
final boolean validSession = loadBalanceSession != null && connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
if (validSession && !loadBalanceSession.isComplete()) {
if (validSession && !loadBalanceSession.getSessionState().isComplete()) {
// Attempt to cancel the session. If successful, trigger the failure callback for the partition.
// If not successful, it indicates that another thread has completed the session and is responsible or the transaction success/failure
if (loadBalanceSession.cancel()) {
@ -278,7 +278,8 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
anySuccess = anySuccess || success;
} while (success);
if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) {
final LoadBalanceSession.LoadBalanceSessionState sessionState = loadBalanceSession.getSessionState();
if (sessionState.isComplete() && sessionState != LoadBalanceSession.LoadBalanceSessionState.CANCELED) {
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), nodeIdentifier);
}
@ -356,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
}
private synchronized LoadBalanceSession getFailoverSession() {
if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
if (loadBalanceSession != null && !loadBalanceSession.getSessionState().isComplete()) {
return loadBalanceSession;
}
@ -402,7 +403,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
}
private synchronized LoadBalanceSession getActiveTransaction(final RegisteredPartition proposedPartition) {
if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
if (loadBalanceSession != null && !loadBalanceSession.getSessionState().isComplete()) {
return loadBalanceSession;
}

View File

@ -136,7 +136,7 @@ public class TestLoadBalanceSession {
while (transaction.communicate()) {
}
assertTrue(transaction.isComplete());
assertTrue(transaction.getSessionState().isComplete());
socketChannel.close();
final Checksum expectedChecksum = new CRC32();

View File

@ -195,7 +195,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
.name("max.bin.count")
.displayName("Maximum Number of Bins")
.description("Specifies the maximum number of bins that can be held in memory at any one time. "
+ "This number should not be smaller than the maximum number of conurrent threads for this Processor, "
+ "This number should not be smaller than the maximum number of concurrent threads for this Processor, "
+ "or the bins that are created will often consist only of a single incoming FlowFile.")
.defaultValue("10")
.required(true)