diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index bf1a1ab1fd..3965b8683e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -543,7 +543,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn LOG.debug("Successfully committed session {} for {}", this, connectableDescription); } - private void commit(final boolean asynchronous) { + private synchronized void commit(final boolean asynchronous) { checkpoint(this.checkpoint != null); // If a checkpoint already exists, we need to copy the collection commit(this.checkpoint, asynchronous); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java index 8b068cd299..56283b91d0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java @@ -45,7 +45,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.OptionalInt; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -64,7 +63,6 @@ import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalancePro public class LoadBalanceSession { private static final Logger logger = LoggerFactory.getLogger(LoadBalanceSession.class); static final int MAX_DATA_FRAME_SIZE = 65535; - private static final long PENALTY_MILLIS = TimeUnit.SECONDS.toMillis(2L); private final RegisteredPartition partition; private final Supplier flowFileSupplier; @@ -75,7 +73,6 @@ public class LoadBalanceSession { private final String peerDescription; private final String connectionId; private final TransactionThreshold transactionThreshold; - private volatile boolean canceled = false; final VersionNegotiator negotiator = new StandardVersionNegotiator(1); private int protocolVersion = 1; @@ -85,13 +82,12 @@ public class LoadBalanceSession { // guarded by synchronizing on 'this' private ByteBuffer preparedFrame; private FlowFileRecord currentFlowFile; - private List flowFilesSent = new ArrayList<>(); + private final List flowFilesSent = new ArrayList<>(); private TransactionPhase phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION; private InputStream flowFileInputStream; - private byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE]; - private boolean complete = false; + private final byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE]; private long readTimeout; - private long penaltyExpiration = -1L; + private volatile LoadBalanceSessionState sessionState = LoadBalanceSessionState.ACTIVE; public LoadBalanceSession(final RegisteredPartition partition, final FlowFileContentAccess contentAccess, final LoadBalanceFlowFileCodec flowFileCodec, final PeerChannel peerChannel, final int timeoutMillis, final TransactionThreshold transactionThreshold) { @@ -124,17 +120,12 @@ public class LoadBalanceSession { return copy; } - public synchronized boolean isComplete() { - return complete; + public synchronized LoadBalanceSessionState getSessionState() { + return sessionState; } public synchronized boolean communicate() throws IOException { - if (isComplete()) { - return false; - } - - if (isPenalized()) { - logger.debug("Will not communicate with Peer {} for Connection {} because session is penalized", peerDescription, connectionId); + if (sessionState.isComplete()) { return false; } @@ -167,25 +158,20 @@ public class LoadBalanceSession { final int bytesWritten = channel.write(preparedFrame); return bytesWritten > 0; } catch (final Exception e) { - complete = true; + sessionState = LoadBalanceSessionState.COMPLETED_EXCEPTIONALLY; throw e; } } public synchronized boolean cancel() { - if (complete) { + if (sessionState.isComplete()) { return false; } - complete = true; - canceled = true; + sessionState = LoadBalanceSessionState.CANCELED; return true; } - public boolean isCanceled() { - return canceled; - } - private boolean confirmTransactionComplete() throws IOException { logger.debug("Confirming Transaction Complete for Peer {}", peerDescription); @@ -210,7 +196,7 @@ public class LoadBalanceSession { throw new IOException("Expected a CONFIRM_COMPLETE_TRANSACTION response from Peer " + peerDescription + " but received a value of " + response); } - complete = true; + sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY; logger.debug("Successfully completed Transaction to send {} FlowFiles to Peer {} for Connection {}", flowFilesSent.size(), peerDescription, connectionId); return true; @@ -496,21 +482,19 @@ public class LoadBalanceSession { protocolVersion = requestedVersion; phase = TransactionPhase.SEND_CONNECTION_ID; logger.debug("Peer {} recommended Protocol Version of {}. Accepting version.", peerDescription, requestedVersion); - - return true; } else { final Integer preferred = negotiator.getPreferredVersion(requestedVersion); if (preferred == null) { logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", peerDescription, requestedVersion); phase = TransactionPhase.ABORT_PROTOCOL_NEGOTIATION; - return true; } else { logger.debug("Peer {} requested version {} of the Protocol. Recommending version {} instead", peerDescription, requestedVersion, preferred); protocolVersion = preferred; phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION; - return true; } } + + return true; } private ByteBuffer noMoreFlowFiles() { @@ -592,7 +576,9 @@ public class LoadBalanceSession { logger.debug("Peer {} has confirmed that the queue is full for Connection {}", peerDescription, connectionId); phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION; checksum.reset(); // We are restarting the session entirely so we need to reset our checksum - complete = true; // consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session. + + // consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session. + sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY; partition.penalize(1000L); } else { throw new TransactionAbortedException("After requesting to know whether or not Peer " + peerDescription + " has space available in Connection " + connectionId @@ -602,15 +588,6 @@ public class LoadBalanceSession { return true; } - private void penalize() { - penaltyExpiration = System.currentTimeMillis() + PENALTY_MILLIS; - } - - private boolean isPenalized() { - // check for penaltyExpiration > -1L is not strictly necessary as it's implied by the second check but is still - // here because it's more efficient to check this than to make the system call to System.currentTimeMillis(). - return penaltyExpiration > -1L && System.currentTimeMillis() < penaltyExpiration; - } private enum TransactionPhase { @@ -653,4 +630,23 @@ public class LoadBalanceSession { return requiredSelectionKey; } } + + public enum LoadBalanceSessionState { + ACTIVE(false), + + COMPLETED_SUCCESSFULLY(true), + + COMPLETED_EXCEPTIONALLY(true), + + CANCELED(true); + + private final boolean complete; + LoadBalanceSessionState(final boolean complete) { + this.complete = complete; + } + + public boolean isComplete() { + return complete; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index 4412987216..3ff1ef0827 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -130,7 +130,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", this, connectionId, removedPartition); final boolean validSession = loadBalanceSession != null && connectionId.equals(loadBalanceSession.getPartition().getConnectionId()); - if (validSession && !loadBalanceSession.isComplete()) { + if (validSession && !loadBalanceSession.getSessionState().isComplete()) { // Attempt to cancel the session. If successful, trigger the failure callback for the partition. // If not successful, it indicates that another thread has completed the session and is responsible or the transaction success/failure if (loadBalanceSession.cancel()) { @@ -278,7 +278,8 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { anySuccess = anySuccess || success; } while (success); - if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) { + final LoadBalanceSession.LoadBalanceSessionState sessionState = loadBalanceSession.getSessionState(); + if (sessionState.isComplete() && sessionState != LoadBalanceSession.LoadBalanceSessionState.CANCELED) { loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), nodeIdentifier); } @@ -356,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { } private synchronized LoadBalanceSession getFailoverSession() { - if (loadBalanceSession != null && !loadBalanceSession.isComplete()) { + if (loadBalanceSession != null && !loadBalanceSession.getSessionState().isComplete()) { return loadBalanceSession; } @@ -402,7 +403,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { } private synchronized LoadBalanceSession getActiveTransaction(final RegisteredPartition proposedPartition) { - if (loadBalanceSession != null && !loadBalanceSession.isComplete()) { + if (loadBalanceSession != null && !loadBalanceSession.getSessionState().isComplete()) { return loadBalanceSession; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java index 43a3cfe3ec..d43ee2be44 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java @@ -136,7 +136,7 @@ public class TestLoadBalanceSession { while (transaction.communicate()) { } - assertTrue(transaction.isComplete()); + assertTrue(transaction.getSessionState().isComplete()); socketChannel.close(); final Checksum expectedChecksum = new CRC32(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java index ce38273d65..187bcec634 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -195,7 +195,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .name("max.bin.count") .displayName("Maximum Number of Bins") .description("Specifies the maximum number of bins that can be held in memory at any one time. " - + "This number should not be smaller than the maximum number of conurrent threads for this Processor, " + + "This number should not be smaller than the maximum number of concurrent threads for this Processor, " + "or the bins that are created will often consist only of a single incoming FlowFile.") .defaultValue("10") .required(true)