diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java index 0c8f8b6806..6e327cd86d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java @@ -17,10 +17,11 @@ package org.apache.nifi.controller.queue.clustered.client.async; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.repository.FlowFileRecord; import java.util.List; public interface TransactionCompleteCallback { - void onTransactionComplete(List flowFilesSent); + void onTransactionComplete(List flowFilesSent, NodeIdentifier nodeIdentifier); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index 753c1f49a3..e55dfcd78c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -256,7 +256,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { } while (success); if (loadBalanceSession.isComplete()) { - loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent()); + loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(), nodeIdentifier); } return anySuccess; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java index a78de553df..854a3a52e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java @@ -173,7 +173,8 @@ public class RemoteQueuePartition implements QueuePartition { final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(flowFileQueue, flowFile); repoRecord.markForAbort(); - updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord)); + // We can send 'null' for the node identifier only because the list of FlowFiles sent is empty. + updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord), null); // If unable to even connect to the node, go ahead and transfer all FlowFiles for this queue to the failure destination. // In either case, transfer those FlowFiles that we failed to send. @@ -204,12 +205,12 @@ public class RemoteQueuePartition implements QueuePartition { final TransactionCompleteCallback successCallback = new TransactionCompleteCallback() { @Override - public void onTransactionComplete(final List flowFilesSent) { + public void onTransactionComplete(final List flowFilesSent, final NodeIdentifier nodeIdentifier) { // We've now completed the transaction. We must now update the repositories and "keep the books", acknowledging the FlowFiles // with the queue so that its size remains accurate. priorityQueue.acknowledge(flowFilesSent); flowFileQueue.onTransfer(flowFilesSent); - updateRepositories(flowFilesSent, Collections.emptyList()); + updateRepositories(flowFilesSent, Collections.emptyList(), nodeIdentifier); } }; @@ -230,7 +231,7 @@ public class RemoteQueuePartition implements QueuePartition { * @param flowFilesSent the FlowFiles that were sent to another node. * @param abortedRecords the Repository Records for any FlowFile whose content was missing. */ - private void updateRepositories(final List flowFilesSent, final Collection abortedRecords) { + private void updateRepositories(final List flowFilesSent, final Collection abortedRecords, final NodeIdentifier nodeIdentifier) { // We update the Provenance Repository first. This way, even if we restart before we update the FlowFile repo, we have the record // that the data was sent in the Provenance Repository. We then update the content claims and finally the FlowFile Repository. We do it // in this order so that when the FlowFile repo is sync'ed to disk, we know which Content Claims are no longer in use. Updating the FlowFile @@ -242,7 +243,7 @@ public class RemoteQueuePartition implements QueuePartition { // are ever created. final List provenanceEvents = new ArrayList<>(flowFilesSent.size() * 2 + abortedRecords.size()); for (final FlowFileRecord sent : flowFilesSent) { - provenanceEvents.add(createSendEvent(sent)); + provenanceEvents.add(createSendEvent(sent, nodeIdentifier)); provenanceEvents.add(createDropEvent(sent)); } @@ -279,7 +280,7 @@ public class RemoteQueuePartition implements QueuePartition { return record; } - private ProvenanceEventRecord createSendEvent(final FlowFileRecord flowFile) { + private ProvenanceEventRecord createSendEvent(final FlowFileRecord flowFile, final NodeIdentifier nodeIdentifier) { final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() .fromFlowFile(flowFile) @@ -289,7 +290,7 @@ public class RemoteQueuePartition implements QueuePartition { .setComponentType("Connection") .setSourceQueueIdentifier(flowFileQueue.getIdentifier()) .setSourceSystemFlowFileIdentifier(flowFile.getAttribute(CoreAttributes.UUID.key())) - .setTransitUri("nifi:connection:" + flowFileQueue.getIdentifier()); + .setTransitUri("nifi://" + nodeIdentifier.getApiAddress() + "/loadbalance/" + flowFileQueue.getIdentifier()); final ContentClaim contentClaim = flowFile.getContentClaim(); if (contentClaim != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java index 43187b5e12..f0d51c8612 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java @@ -40,10 +40,10 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer { } @Override - public void authorize(final Collection clientIdentities) throws NotAuthorizedException { + public String authorize(final Collection clientIdentities) throws NotAuthorizedException { if (clientIdentities == null) { logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); - return; + return null; } final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() @@ -53,7 +53,7 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer { for (final String clientId : clientIdentities) { if (nodeIds.contains(clientId)) { logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); - return; + return clientId; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java index 3a716e2035..3abd328936 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java @@ -20,5 +20,13 @@ package org.apache.nifi.controller.queue.clustered.server; import java.util.Collection; public interface LoadBalanceAuthorizer { - void authorize(Collection clientIdentities) throws NotAuthorizedException; + /** + * Checks if any of the given identities is allowed to load balance data. If so, the identity that has been + * permitted is returned. If not, a NotAuthorizedException is thrown. + * + * @param clientIdentities the collection of identities to check + * @return the identity that is authorized, or null if the given collection of identities is null + * @throws NotAuthorizedException if none of the given identities is authorized to load balance data + */ + String authorize(Collection clientIdentities) throws NotAuthorizedException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index 0f032dfefb..dda71de58d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -123,16 +123,13 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { final InputStream in = new BufferedInputStream(socket.getInputStream()); final OutputStream out = new BufferedOutputStream(socket.getOutputStream()); - String peerDescription = socket.getInetAddress().toString(); + String peerDescription = socket.getInetAddress().getHostName(); if (socket instanceof SSLSocket) { final SSLSession sslSession = ((SSLSocket) socket).getSession(); final Set certIdentities; try { certIdentities = getCertificateIdentities(sslSession); - - final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); - peerDescription = CertificateUtils.extractUsername(dn); } catch (final CertificateException e) { throw new IOException("Failed to extract Client Certificate", e); } @@ -140,7 +137,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", peerDescription, certIdentities); - authorizer.authorize(certIdentities); + peerDescription = authorizer.authorize(certIdentities); logger.debug("Client Identities {} are authorized to load balance data", certIdentities); } @@ -155,7 +152,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { return; } - receiveFlowFiles(in, out, peerDescription, version, socket.getInetAddress().getHostName()); + receiveFlowFiles(in, out, peerDescription, version); } private Set getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException { @@ -225,7 +222,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { } - protected void receiveFlowFiles(final InputStream in, final OutputStream out, final String peerDescription, final int protocolVersion, final String nodeName) throws IOException { + protected void receiveFlowFiles(final InputStream in, final OutputStream out, final String peerDescription, final int protocolVersion) throws IOException { logger.debug("Receiving FlowFiles from {}", peerDescription); final long startTimestamp = System.currentTimeMillis(); @@ -309,7 +306,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { } verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size()); - completeTransaction(in, out, peerDescription, flowFilesReceived, nodeName, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue); + completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue); } catch (final Exception e) { // If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because // they are no longer needed. @@ -324,7 +321,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { } private void completeTransaction(final InputStream in, final OutputStream out, final String peerDescription, final List flowFilesReceived, - final String nodeName, final String connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws IOException { + final String connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws IOException { final int completionIndicator = in.read(); if (completionIndicator < 0) { throw new EOFException("Expected to receive a Transaction Completion Indicator from Peer " + peerDescription + " but encountered EOF"); @@ -343,7 +340,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { } logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription); - registerReceiveProvenanceEvents(flowFilesReceived, nodeName, connectionId, startTimestamp); + registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp); updateFlowFileRepository(flowFilesReceived, flowFileQueue); transferFlowFilesToQueue(flowFilesReceived, flowFileQueue); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java index 17e9237706..4871d72d82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java @@ -101,7 +101,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class LoadBalancedQueueIT { - private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {}; + private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next(); private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> { throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized"); }; @@ -269,7 +269,7 @@ public class LoadBalancedQueueIT { } } - final int totalFlowFileCount = 6; + final int totalFlowFileCount = 7; // Wait up to 10 seconds for the server's FlowFile Repository to be updated final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java index efa5d738ea..20b6add055 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java @@ -122,7 +122,7 @@ public class TestLoadBalanceSession { final FlowFileContentAccess contentAccess = contentMap::get; final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false, - flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true); + flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true); final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port)); @@ -209,7 +209,7 @@ public class TestLoadBalanceSession { final FlowFileContentAccess contentAccess = contentMap::get; final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false, - flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true); + flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true); final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java index d020c12f55..94f992f0a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -78,7 +78,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; public class TestStandardLoadBalanceProtocol { - private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {}; + private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next(); private FlowFileRepository flowFileRepo; private ContentRepository contentRepo; private ProvenanceRepository provenanceRepo; @@ -204,7 +204,7 @@ public class TestStandardLoadBalanceProtocol { dos.writeLong(checksum.getValue()); dos.write(COMPLETE_TRANSACTION); - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); final byte[] serverResponse = serverOutput.toByteArray(); assertEquals(3, serverResponse.length); @@ -266,7 +266,7 @@ public class TestStandardLoadBalanceProtocol { dos.writeLong(checksum.getValue()); dos.write(COMPLETE_TRANSACTION); - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); final byte[] serverResponse = serverOutput.toByteArray(); assertEquals(3, serverResponse.length); @@ -331,7 +331,7 @@ public class TestStandardLoadBalanceProtocol { dos.writeLong(checksum.getValue()); dos.write(COMPLETE_TRANSACTION); - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); final byte[] serverResponse = serverOutput.toByteArray(); assertEquals(2, serverResponse.length); @@ -393,7 +393,7 @@ public class TestStandardLoadBalanceProtocol { dos.close(); try { - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); Assert.fail("Expected EOFException but none was thrown"); } catch (final EOFException eof) { // expected @@ -440,7 +440,7 @@ public class TestStandardLoadBalanceProtocol { dos.write(COMPLETE_TRANSACTION); try { - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); Assert.fail("Expected TransactionAbortedException but none was thrown"); } catch (final TransactionAbortedException e) { // expected @@ -492,7 +492,7 @@ public class TestStandardLoadBalanceProtocol { dos.close(); try { - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); Assert.fail("Expected EOFException but none was thrown"); } catch (final EOFException e) { // expected @@ -541,7 +541,7 @@ public class TestStandardLoadBalanceProtocol { dos.write(ABORT_TRANSACTION); try { - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); Assert.fail("Expected TransactionAbortedException but none was thrown"); } catch (final TransactionAbortedException e) { // expected @@ -590,7 +590,7 @@ public class TestStandardLoadBalanceProtocol { dos.writeLong(checksum.getValue()); dos.write(COMPLETE_TRANSACTION); - protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); final byte[] serverResponse = serverOutput.toByteArray(); assertEquals(3, serverResponse.length);