diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java index bc730d8482..b9df55e028 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java @@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -156,12 +158,24 @@ public class PopularVoteFlowElection implements FlowElection { return null; } + final List nonEmptyCandidates = candidateByFingerprint.values().stream() + .filter(candidate -> !candidate.isFlowEmpty()) + .collect(Collectors.toList()); + + if (nonEmptyCandidates.isEmpty()) { + // All flow candidates are empty flows. Just use one of them. + final FlowCandidate electedCandidate = candidateByFingerprint.values().iterator().next(); + this.electedDataFlow = electedCandidate.getDataFlow(); + return electedCandidate; + } + final FlowCandidate elected; - if (candidateByFingerprint.size() == 1) { - elected = candidateByFingerprint.values().iterator().next(); + if (nonEmptyCandidates.size() == 1) { + // Only one flow is non-empty. Use that one. + elected = nonEmptyCandidates.iterator().next(); } else { - elected = candidateByFingerprint.values().stream() - .filter(candidate -> !candidate.isFlowEmpty()) // We have more than 1 fingerprint. Do not consider empty flows. + // Choose the non-empty flow that got the most votes. + elected = nonEmptyCandidates.stream() .max((candidate1, candidate2) -> Integer.compare(candidate1.getVotes(), candidate2.getVotes())) .get(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java index c01371db73..b7f9e82975 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -34,6 +35,8 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.fingerprint.FingerprintFactory; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestPopularVoteFlowElection { @@ -62,6 +65,43 @@ public class TestPopularVoteFlowElection { assertEquals(new String(flow), new String(electedDataFlow.getFlow())); } + @Test + public void testDifferentEmptyFlows() throws IOException { + final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class); + Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenAnswer(new Answer() { + @Override + public String answer(final InvocationOnMock invocation) throws Throwable { + final byte[] flow = invocation.getArgumentAt(0, byte[].class); + final String xml = new String(flow); + + // Return the ID of the root group as the fingerprint. + final String fingerprint = xml.replaceAll("(?s:(.*)(.*?)(.*))", "$2"); + return fingerprint; + } + }); + + final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 3, fingerprintFactory); + final byte[] flow1 = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml")); + final byte[] flow2 = Files.readAllBytes(Paths.get("src/test/resources/conf/different-empty-flow.xml")); + + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + assertNull(election.castVote(createDataFlow(flow1), createNodeId(1))); + + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + assertNull(election.castVote(createDataFlow(flow1), createNodeId(2))); + + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + + final DataFlow electedDataFlow = election.castVote(createDataFlow(flow2), createNodeId(3)); + assertNotNull(electedDataFlow); + + final String electedFlowXml = new String(electedDataFlow.getFlow()); + assertTrue(new String(flow1).equals(electedFlowXml) || new String(flow2).equals(electedFlowXml)); + } + @Test public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml new file mode 100644 index 0000000000..8c9641ae25 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml @@ -0,0 +1,27 @@ + + + + 10 + 5 + + 11111111-1111-1111-1111-111111111111 + Empty NiFi Flow + + + + + + \ No newline at end of file