NIFI-5746: Use Node Identifier's node address instead of getting from socket for RECEIVE prov events; make SEND prov events match syntax of RECEIVE prov events

NIFI-5746: Addressed issue found in review process

This closes #3109.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mark Payne 2018-10-24 13:45:55 -04:00 committed by Koji Kawamura
parent 2201f7746f
commit c7ff2fc5db
9 changed files with 43 additions and 36 deletions

View File

@ -17,10 +17,11 @@
package org.apache.nifi.controller.queue.clustered.client.async;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.FlowFileRecord;
import java.util.List;
public interface TransactionCompleteCallback {
void onTransactionComplete(List<FlowFileRecord> flowFilesSent);
void onTransactionComplete(List<FlowFileRecord> flowFilesSent, NodeIdentifier nodeIdentifier);
}

View File

@ -256,7 +256,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
} while (success);
if (loadBalanceSession.isComplete()) {
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent());
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(), nodeIdentifier);
}
return anySuccess;

View File

@ -173,7 +173,8 @@ public class RemoteQueuePartition implements QueuePartition {
final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(flowFileQueue, flowFile);
repoRecord.markForAbort();
updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord));
// We can send 'null' for the node identifier only because the list of FlowFiles sent is empty.
updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord), null);
// If unable to even connect to the node, go ahead and transfer all FlowFiles for this queue to the failure destination.
// In either case, transfer those FlowFiles that we failed to send.
@ -204,12 +205,12 @@ public class RemoteQueuePartition implements QueuePartition {
final TransactionCompleteCallback successCallback = new TransactionCompleteCallback() {
@Override
public void onTransactionComplete(final List<FlowFileRecord> flowFilesSent) {
public void onTransactionComplete(final List<FlowFileRecord> flowFilesSent, final NodeIdentifier nodeIdentifier) {
// We've now completed the transaction. We must now update the repositories and "keep the books", acknowledging the FlowFiles
// with the queue so that its size remains accurate.
priorityQueue.acknowledge(flowFilesSent);
flowFileQueue.onTransfer(flowFilesSent);
updateRepositories(flowFilesSent, Collections.emptyList());
updateRepositories(flowFilesSent, Collections.emptyList(), nodeIdentifier);
}
};
@ -230,7 +231,7 @@ public class RemoteQueuePartition implements QueuePartition {
* @param flowFilesSent the FlowFiles that were sent to another node.
* @param abortedRecords the Repository Records for any FlowFile whose content was missing.
*/
private void updateRepositories(final List<FlowFileRecord> flowFilesSent, final Collection<RepositoryRecord> abortedRecords) {
private void updateRepositories(final List<FlowFileRecord> flowFilesSent, final Collection<RepositoryRecord> abortedRecords, final NodeIdentifier nodeIdentifier) {
// We update the Provenance Repository first. This way, even if we restart before we update the FlowFile repo, we have the record
// that the data was sent in the Provenance Repository. We then update the content claims and finally the FlowFile Repository. We do it
// in this order so that when the FlowFile repo is sync'ed to disk, we know which Content Claims are no longer in use. Updating the FlowFile
@ -242,7 +243,7 @@ public class RemoteQueuePartition implements QueuePartition {
// are ever created.
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFilesSent.size() * 2 + abortedRecords.size());
for (final FlowFileRecord sent : flowFilesSent) {
provenanceEvents.add(createSendEvent(sent));
provenanceEvents.add(createSendEvent(sent, nodeIdentifier));
provenanceEvents.add(createDropEvent(sent));
}
@ -279,7 +280,7 @@ public class RemoteQueuePartition implements QueuePartition {
return record;
}
private ProvenanceEventRecord createSendEvent(final FlowFileRecord flowFile) {
private ProvenanceEventRecord createSendEvent(final FlowFileRecord flowFile, final NodeIdentifier nodeIdentifier) {
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
.fromFlowFile(flowFile)
@ -289,7 +290,7 @@ public class RemoteQueuePartition implements QueuePartition {
.setComponentType("Connection")
.setSourceQueueIdentifier(flowFileQueue.getIdentifier())
.setSourceSystemFlowFileIdentifier(flowFile.getAttribute(CoreAttributes.UUID.key()))
.setTransitUri("nifi:connection:" + flowFileQueue.getIdentifier());
.setTransitUri("nifi://" + nodeIdentifier.getApiAddress() + "/loadbalance/" + flowFileQueue.getIdentifier());
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim != null) {

View File

@ -40,10 +40,10 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
}
@Override
public void authorize(final Collection<String> clientIdentities) throws NotAuthorizedException {
public String authorize(final Collection<String> clientIdentities) throws NotAuthorizedException {
if (clientIdentities == null) {
logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing");
return;
return null;
}
final Set<String> nodeIds = clusterCoordinator.getNodeIdentifiers().stream()
@ -53,7 +53,7 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
for (final String clientId : clientIdentities) {
if (nodeIds.contains(clientId)) {
logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId);
return;
return clientId;
}
}

View File

@ -20,5 +20,13 @@ package org.apache.nifi.controller.queue.clustered.server;
import java.util.Collection;
public interface LoadBalanceAuthorizer {
void authorize(Collection<String> clientIdentities) throws NotAuthorizedException;
/**
* Checks if any of the given identities is allowed to load balance data. If so, the identity that has been
* permitted is returned. If not, a NotAuthorizedException is thrown.
*
* @param clientIdentities the collection of identities to check
* @return the identity that is authorized, or null if the given collection of identities is null
* @throws NotAuthorizedException if none of the given identities is authorized to load balance data
*/
String authorize(Collection<String> clientIdentities) throws NotAuthorizedException;
}

View File

@ -123,16 +123,13 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
final InputStream in = new BufferedInputStream(socket.getInputStream());
final OutputStream out = new BufferedOutputStream(socket.getOutputStream());
String peerDescription = socket.getInetAddress().toString();
String peerDescription = socket.getInetAddress().getHostName();
if (socket instanceof SSLSocket) {
final SSLSession sslSession = ((SSLSocket) socket).getSession();
final Set<String> certIdentities;
try {
certIdentities = getCertificateIdentities(sslSession);
final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket);
peerDescription = CertificateUtils.extractUsername(dn);
} catch (final CertificateException e) {
throw new IOException("Failed to extract Client Certificate", e);
}
@ -140,7 +137,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'",
peerDescription, certIdentities);
authorizer.authorize(certIdentities);
peerDescription = authorizer.authorize(certIdentities);
logger.debug("Client Identities {} are authorized to load balance data", certIdentities);
}
@ -155,7 +152,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
return;
}
receiveFlowFiles(in, out, peerDescription, version, socket.getInetAddress().getHostName());
receiveFlowFiles(in, out, peerDescription, version);
}
private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
@ -225,7 +222,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
}
protected void receiveFlowFiles(final InputStream in, final OutputStream out, final String peerDescription, final int protocolVersion, final String nodeName) throws IOException {
protected void receiveFlowFiles(final InputStream in, final OutputStream out, final String peerDescription, final int protocolVersion) throws IOException {
logger.debug("Receiving FlowFiles from {}", peerDescription);
final long startTimestamp = System.currentTimeMillis();
@ -309,7 +306,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
}
verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size());
completeTransaction(in, out, peerDescription, flowFilesReceived, nodeName, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);
completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);
} catch (final Exception e) {
// If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because
// they are no longer needed.
@ -324,7 +321,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
}
private void completeTransaction(final InputStream in, final OutputStream out, final String peerDescription, final List<RemoteFlowFileRecord> flowFilesReceived,
final String nodeName, final String connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws IOException {
final String connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws IOException {
final int completionIndicator = in.read();
if (completionIndicator < 0) {
throw new EOFException("Expected to receive a Transaction Completion Indicator from Peer " + peerDescription + " but encountered EOF");
@ -343,7 +340,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
}
logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription);
registerReceiveProvenanceEvents(flowFilesReceived, nodeName, connectionId, startTimestamp);
registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp);
updateFlowFileRepository(flowFilesReceived, flowFileQueue);
transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);

View File

@ -101,7 +101,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class LoadBalancedQueueIT {
private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next();
private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> {
throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized");
};
@ -269,7 +269,7 @@ public class LoadBalancedQueueIT {
}
}
final int totalFlowFileCount = 6;
final int totalFlowFileCount = 7;
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);

View File

@ -122,7 +122,7 @@ public class TestLoadBalanceSession {
final FlowFileContentAccess contentAccess = contentMap::get;
final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false,
flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port));
@ -209,7 +209,7 @@ public class TestLoadBalanceSession {
final FlowFileContentAccess contentAccess = contentMap::get;
final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false,
flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port));

View File

@ -78,7 +78,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class TestStandardLoadBalanceProtocol {
private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next();
private FlowFileRepository flowFileRepo;
private ContentRepository contentRepo;
private ProvenanceRepository provenanceRepo;
@ -204,7 +204,7 @@ public class TestStandardLoadBalanceProtocol {
dos.writeLong(checksum.getValue());
dos.write(COMPLETE_TRANSACTION);
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
final byte[] serverResponse = serverOutput.toByteArray();
assertEquals(3, serverResponse.length);
@ -266,7 +266,7 @@ public class TestStandardLoadBalanceProtocol {
dos.writeLong(checksum.getValue());
dos.write(COMPLETE_TRANSACTION);
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
final byte[] serverResponse = serverOutput.toByteArray();
assertEquals(3, serverResponse.length);
@ -331,7 +331,7 @@ public class TestStandardLoadBalanceProtocol {
dos.writeLong(checksum.getValue());
dos.write(COMPLETE_TRANSACTION);
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
final byte[] serverResponse = serverOutput.toByteArray();
assertEquals(2, serverResponse.length);
@ -393,7 +393,7 @@ public class TestStandardLoadBalanceProtocol {
dos.close();
try {
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
Assert.fail("Expected EOFException but none was thrown");
} catch (final EOFException eof) {
// expected
@ -440,7 +440,7 @@ public class TestStandardLoadBalanceProtocol {
dos.write(COMPLETE_TRANSACTION);
try {
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
Assert.fail("Expected TransactionAbortedException but none was thrown");
} catch (final TransactionAbortedException e) {
// expected
@ -492,7 +492,7 @@ public class TestStandardLoadBalanceProtocol {
dos.close();
try {
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
Assert.fail("Expected EOFException but none was thrown");
} catch (final EOFException e) {
// expected
@ -541,7 +541,7 @@ public class TestStandardLoadBalanceProtocol {
dos.write(ABORT_TRANSACTION);
try {
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
Assert.fail("Expected TransactionAbortedException but none was thrown");
} catch (final TransactionAbortedException e) {
// expected
@ -590,7 +590,7 @@ public class TestStandardLoadBalanceProtocol {
dos.writeLong(checksum.getValue());
dos.write(COMPLETE_TRANSACTION);
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test");
protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
final byte[] serverResponse = serverOutput.toByteArray();
assertEquals(3, serverResponse.length);