NIFI-6919: Added relationship attribute to DistributeLoad

NIFI-6919: Cleaned up docs

NIFI-6919: Cleanup

NIFI-6919: Cleanup

NIFI-6919: added negative unit test

NIFI-6919: Removed unnecesary feature flag

Updated attribute description

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3939
This commit is contained in:
Michael Hogue 2019-12-12 21:06:15 -05:00 committed by Matthew Burgess
parent 824cc0ed77
commit 421bfdd5ff
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 53 additions and 22 deletions

View File

@ -31,14 +31,16 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@ -71,6 +73,9 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
@DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
+ "<Distribution Strategy>")
@WritesAttributes(
@WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
)
public class DistributeLoad extends AbstractProcessor {
public static final String STRATEGY_ROUND_ROBIN = "round robin";
@ -93,31 +98,26 @@ public class DistributeLoad extends AbstractProcessor {
.allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
.defaultValue(STRATEGY_ROUND_ROBIN)
.build();
public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
.name("Hostnames")
.description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
.required(true)
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
if (null == input) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need to specify delimited list of FQDNs").build();
return result;
}
String[] hostNames = input.split("(?:,+|;+|\\s+)");
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need a FQDN rather than a simple host name.").build();
return result;
}
}
.addValidator((subject, input, context) -> {
ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
if (null == input) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need to specify delimited list of FQDNs").build();
return result;
}
String[] hostNames = input.split("(?:,+|;+|\\s+)");
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need a FQDN rather than a simple host name.").build();
return result;
}
}
return result;
}).build();
public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
.name("Load Distribution Service ID")
@ -125,6 +125,7 @@ public class DistributeLoad extends AbstractProcessor {
.required(true)
.identifiesControllerService(LoadDistributionService.class)
.build();
public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
@ -365,6 +366,9 @@ public class DistributeLoad extends AbstractProcessor {
return;
}
// add an attribute capturing which relationship a flowfile was routed through
session.putAttribute(flowFile, RELATIONSHIP_ATTRIBUTE, relationship.getName());
session.transfer(flowFile, relationship);
session.getProvenanceReporter().route(flowFile, relationship);
}

View File

@ -16,9 +16,13 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -135,4 +139,27 @@ public class TestDistributeLoad {
testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
}
}
@Test
public void testFlowFileAttributesAdded() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY, DistributeLoad.STRATEGY_NEXT_AVAILABLE);
for (int i = 0; i < 100; i++) {
testRunner.enqueue(new byte[0]);
}
testRunner.run(101);
testRunner.assertQueueEmpty();
for (int i = 1; i <= 100; i++) {
testRunner.assertTransferCount(String.valueOf(i), 1);
final List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(String.valueOf(i));
assertEquals(1, flowFilesForRelationship.size());
final MockFlowFile mockFlowFile = flowFilesForRelationship.get(0);
assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
}
}
}