diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index 57c172321d..d82ea5ff23 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@ -43,6 +43,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -68,19 +69,31 @@ import org.apache.nifi.processor.util.StandardValidators; + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties" + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name " + "'5' will be receive 10 FlowFiles in each iteration instead of 1.") -@DynamicProperty(name = "The relationship name(positive number)", value = "The relationship Weight(positive number)", description = "adding a " +@DynamicProperty(name = "The relationship name (positive number)", value = "The relationship Weight (positive number)", description = "Adding a " + "property with the name '5' and value '10' means that the relationship with name " - + "'5' will be receive 10 FlowFiles in each iteration instead of 1.") + + "'5' will receive 10 FlowFiles in each iteration instead of 1.") @DynamicRelationship(name = "A number 1..", description = "FlowFiles are sent to this relationship per the " + "") @WritesAttributes( - @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through") + @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the FlowFile has been routed through") ) public class DistributeLoad extends AbstractProcessor { - public static final String STRATEGY_ROUND_ROBIN = "round robin"; - public static final String STRATEGY_NEXT_AVAILABLE = "next available"; - public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service"; + public static final String ROUND_ROBIN = "round robin"; + public static final String NEXT_AVAILABLE = "next available"; + public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service"; + public static final String OVERFLOW = "overflow"; + + public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN, + "Relationship selection is evenly distributed in a round robin fashion; all relationships must be available."); + public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE, + "Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available."); + public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE, + "Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available."); + public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW, + "Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available."); + + public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder() .name("Number of Relationships") @@ -91,12 +104,10 @@ public class DistributeLoad extends AbstractProcessor { .build(); public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder() .name("Distribution Strategy") - .description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all " - + "destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 " - + "destination can accept FlowFiles.") + .description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.") .required(true) - .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE) - .defaultValue(STRATEGY_ROUND_ROBIN) + .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW) + .defaultValue(ROUND_ROBIN) .build(); public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder() .name("Hostnames") @@ -129,7 +140,7 @@ public class DistributeLoad extends AbstractProcessor { private List properties; private final AtomicReference> relationshipsRef = new AtomicReference<>(); - private final AtomicReference strategyRef = new AtomicReference(new RoundRobinStrategy()); + private final AtomicReference strategyRef = new AtomicReference<>(new RoundRobinStrategy()); private final AtomicReference> weightedRelationshipListRef = new AtomicReference<>(); private final AtomicBoolean doCustomValidate = new AtomicBoolean(false); private volatile LoadDistributionListener myListener; @@ -167,14 +178,20 @@ public class DistributeLoad extends AbstractProcessor { this.relationshipsRef.set(Collections.unmodifiableSet(relationships)); } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) { switch (newValue.toLowerCase()) { - case STRATEGY_ROUND_ROBIN: + case ROUND_ROBIN: strategyRef.set(new RoundRobinStrategy()); break; - case STRATEGY_NEXT_AVAILABLE: + case NEXT_AVAILABLE: strategyRef.set(new NextAvailableStrategy()); break; - case STRATEGY_LOAD_DISTRIBUTION_SERVICE: + case LOAD_DISTRIBUTION_SERVICE: strategyRef.set(new LoadDistributionStrategy()); + break; + case OVERFLOW: + strategyRef.set(new OverflowStrategy()); + break; + default: + throw new IllegalStateException("Invalid distribution strategy"); } doSetProps.set(true); doCustomValidate.set(true); @@ -222,7 +239,7 @@ public class DistributeLoad extends AbstractProcessor { Collection results = new ArrayList<>(); if (doCustomValidate.getAndSet(false)) { String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue(); - if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) { + if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) { // make sure Hostnames and Controller service are set PropertyValue propDesc = validationContext.getProperty(HOSTNAMES); if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { @@ -233,7 +250,7 @@ public class DistributeLoad extends AbstractProcessor { if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { results.add(new ValidationResult.Builder() .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName()) - .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'") + .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy") .valid(false).build()); } if (results.isEmpty()) { @@ -275,7 +292,7 @@ public class DistributeLoad extends AbstractProcessor { final Map weightings = new LinkedHashMap<>(); String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue(); - if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) { + if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) { String hostNamesValue = context.getProperty(HOSTNAMES).getValue(); String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); Set hostNameSet = new HashSet<>(); @@ -392,7 +409,7 @@ public class DistributeLoad extends AbstractProcessor { /** * Implementations must be thread-safe. */ - private static interface DistributionStrategy { + private interface DistributionStrategy { /** * @param context context @@ -491,4 +508,35 @@ public class DistributeLoad extends AbstractProcessor { return false; } } + + private class OverflowStrategy implements DistributionStrategy { + + @Override + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final int numRelationships = relationshipList.size(); + + boolean foundFreeRelationship = false; + Relationship relationship = null; + // Getting set of available relationships only once. This may miss a relationship that recently became available, but + // overall is more efficient than re-calling for every relationship evaluation + Set availableRelationships = context.getAvailableRelationships(); + + int weightedIndex = 0; + while (!foundFreeRelationship) { + relationship = relationshipList.get(weightedIndex); + foundFreeRelationship = availableRelationships.contains(relationship); + if (++weightedIndex % numRelationships == 0 && !foundFreeRelationship) { + return null; + } + } + + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return false; + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java index 975858a47d..8de24e981b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java @@ -123,7 +123,7 @@ public class TestDistributeLoad { final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad()); testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100"); - testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE); + testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE.getValue()); for (int i = 0; i < 99; i++) { testRunner.enqueue(new byte[0]); @@ -162,4 +162,48 @@ public class TestDistributeLoad { assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE)); } } + + @Test + public void testOverflow() { + final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad()); + + testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "3"); + testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_OVERFLOW.getValue()); + + // Queue all FlowFiles required for this test + for (int i = 0; i < 8; i++) { + testRunner.enqueue(new byte[0]); + } + + // Repeatedly send to highest weighted relationship as long as it is available + testRunner.run(2, false); + testRunner.assertTransferCount("1", 2); + testRunner.assertTransferCount("2", 0); + testRunner.assertTransferCount("3", 0); + + // When highest weighted relationship becomes unavailable, repeatedly send to next-highest weighted relationship + testRunner.clearTransferState(); + testRunner.setRelationshipUnavailable("1"); + testRunner.run(2, false); + testRunner.assertTransferCount("1", 0); + testRunner.assertTransferCount("2", 2); + testRunner.assertTransferCount("3", 0); + + // Return to highest weighted relationship when it becomes available again + testRunner.clearTransferState(); + testRunner.setRelationshipAvailable("1"); + testRunner.run(2, false); + testRunner.assertTransferCount("1", 2); + testRunner.assertTransferCount("2", 0); + testRunner.assertTransferCount("3", 0); + + // Skip ahead and repeatedly send to the first available relationship when multiple relationships are unavailable + testRunner.clearTransferState(); + testRunner.setRelationshipUnavailable("1"); + testRunner.setRelationshipUnavailable("2"); + testRunner.run(2, false); + testRunner.assertTransferCount("1", 0); + testRunner.assertTransferCount("2", 0); + testRunner.assertTransferCount("3", 2); + } }