NIFI-1966: Recreated issue that is outlined in JIRA (the reason for re-opening the ticket) that results in 'java.util.NoSuchElementException: No value present' in unit test - Resolved issue where two flows that are both empty but have different fingerprints (due to root group id being different) causes vote election to fail

This closes #995.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-09-07 21:07:01 -04:00 committed by Bryan Bende
parent d36b76cc60
commit bc7c42efa5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 85 additions and 4 deletions

View File

@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -156,12 +158,24 @@ public class PopularVoteFlowElection implements FlowElection {
return null; return null;
} }
final List<FlowCandidate> nonEmptyCandidates = candidateByFingerprint.values().stream()
.filter(candidate -> !candidate.isFlowEmpty())
.collect(Collectors.toList());
if (nonEmptyCandidates.isEmpty()) {
// All flow candidates are empty flows. Just use one of them.
final FlowCandidate electedCandidate = candidateByFingerprint.values().iterator().next();
this.electedDataFlow = electedCandidate.getDataFlow();
return electedCandidate;
}
final FlowCandidate elected; final FlowCandidate elected;
if (candidateByFingerprint.size() == 1) { if (nonEmptyCandidates.size() == 1) {
elected = candidateByFingerprint.values().iterator().next(); // Only one flow is non-empty. Use that one.
elected = nonEmptyCandidates.iterator().next();
} else { } else {
elected = candidateByFingerprint.values().stream() // Choose the non-empty flow that got the most votes.
.filter(candidate -> !candidate.isFlowEmpty()) // We have more than 1 fingerprint. Do not consider empty flows. elected = nonEmptyCandidates.stream()
.max((candidate1, candidate2) -> Integer.compare(candidate1.getVotes(), candidate2.getVotes())) .max((candidate1, candidate2) -> Integer.compare(candidate1.getVotes(), candidate2.getVotes()))
.get(); .get();
} }

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -34,6 +35,8 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.fingerprint.FingerprintFactory; import org.apache.nifi.fingerprint.FingerprintFactory;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestPopularVoteFlowElection { public class TestPopularVoteFlowElection {
@ -62,6 +65,43 @@ public class TestPopularVoteFlowElection {
assertEquals(new String(flow), new String(electedDataFlow.getFlow())); assertEquals(new String(flow), new String(electedDataFlow.getFlow()));
} }
@Test
public void testDifferentEmptyFlows() throws IOException {
final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class);
Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenAnswer(new Answer<String>() {
@Override
public String answer(final InvocationOnMock invocation) throws Throwable {
final byte[] flow = invocation.getArgumentAt(0, byte[].class);
final String xml = new String(flow);
// Return the ID of the root group as the fingerprint.
final String fingerprint = xml.replaceAll("(?s:(.*<id>)(.*?)(</id>.*))", "$2");
return fingerprint;
}
});
final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 3, fingerprintFactory);
final byte[] flow1 = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
final byte[] flow2 = Files.readAllBytes(Paths.get("src/test/resources/conf/different-empty-flow.xml"));
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
assertNull(election.castVote(createDataFlow(flow1), createNodeId(1)));
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
assertNull(election.castVote(createDataFlow(flow1), createNodeId(2)));
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
final DataFlow electedDataFlow = election.castVote(createDataFlow(flow2), createNodeId(3));
assertNotNull(electedDataFlow);
final String electedFlowXml = new String(electedDataFlow.getFlow());
assertTrue(new String(flow1).equals(electedFlowXml) || new String(flow2).equals(electedFlowXml));
}
@Test @Test
public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException { public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException {

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<flowController encoding-version="1.0">
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<rootGroup>
<id>11111111-1111-1111-1111-111111111111</id>
<name>Empty NiFi Flow</name>
<position x="0.0" y="0.0"/>
<comment/>
</rootGroup>
<controllerServices/>
<reportingTasks/>
</flowController>