NIFI-8962: Add overflow strategy to DistributeLoad (#5267)

This commit is contained in:
markobean 2022-01-07 14:45:18 -05:00 committed by GitHub
parent 109e4d1c95
commit 42626adab8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 20 deletions

View File

@ -43,6 +43,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@ -68,19 +69,31 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties"
+ "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name "
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
@DynamicProperty(name = "The relationship name(positive number)", value = "The relationship Weight(positive number)", description = "adding a "
@DynamicProperty(name = "The relationship name (positive number)", value = "The relationship Weight (positive number)", description = "Adding a "
+ "property with the name '5' and value '10' means that the relationship with name "
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
+ "'5' will receive 10 FlowFiles in each iteration instead of 1.")
@DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
+ "<Distribution Strategy>")
@WritesAttributes(
@WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
@WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the FlowFile has been routed through")
)
public class DistributeLoad extends AbstractProcessor {
public static final String STRATEGY_ROUND_ROBIN = "round robin";
public static final String STRATEGY_NEXT_AVAILABLE = "next available";
public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
public static final String ROUND_ROBIN = "round robin";
public static final String NEXT_AVAILABLE = "next available";
public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service";
public static final String OVERFLOW = "overflow";
public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN,
"Relationship selection is evenly distributed in a round robin fashion; all relationships must be available.");
public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE,
"Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available.");
public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE,
"Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available.");
public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW,
"Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available.");
public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder()
.name("Number of Relationships")
@ -91,12 +104,10 @@ public class DistributeLoad extends AbstractProcessor {
.build();
public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder()
.name("Distribution Strategy")
.description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all "
+ "destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 "
+ "destination can accept FlowFiles.")
.description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.")
.required(true)
.allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
.defaultValue(STRATEGY_ROUND_ROBIN)
.allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW)
.defaultValue(ROUND_ROBIN)
.build();
public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
.name("Hostnames")
@ -129,7 +140,7 @@ public class DistributeLoad extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<>(new RoundRobinStrategy());
private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
private volatile LoadDistributionListener myListener;
@ -167,14 +178,20 @@ public class DistributeLoad extends AbstractProcessor {
this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
} else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
switch (newValue.toLowerCase()) {
case STRATEGY_ROUND_ROBIN:
case ROUND_ROBIN:
strategyRef.set(new RoundRobinStrategy());
break;
case STRATEGY_NEXT_AVAILABLE:
case NEXT_AVAILABLE:
strategyRef.set(new NextAvailableStrategy());
break;
case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
case LOAD_DISTRIBUTION_SERVICE:
strategyRef.set(new LoadDistributionStrategy());
break;
case OVERFLOW:
strategyRef.set(new OverflowStrategy());
break;
default:
throw new IllegalStateException("Invalid distribution strategy");
}
doSetProps.set(true);
doCustomValidate.set(true);
@ -222,7 +239,7 @@ public class DistributeLoad extends AbstractProcessor {
Collection<ValidationResult> results = new ArrayList<>();
if (doCustomValidate.getAndSet(false)) {
String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
// make sure Hostnames and Controller service are set
PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
@ -233,7 +250,7 @@ public class DistributeLoad extends AbstractProcessor {
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
.explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'")
.explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy")
.valid(false).build());
}
if (results.isEmpty()) {
@ -275,7 +292,7 @@ public class DistributeLoad extends AbstractProcessor {
final Map<Integer, Integer> weightings = new LinkedHashMap<>();
String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
Set<String> hostNameSet = new HashSet<>();
@ -392,7 +409,7 @@ public class DistributeLoad extends AbstractProcessor {
/**
* Implementations must be thread-safe.
*/
private static interface DistributionStrategy {
private interface DistributionStrategy {
/**
* @param context context
@ -491,4 +508,35 @@ public class DistributeLoad extends AbstractProcessor {
return false;
}
}
private class OverflowStrategy implements DistributionStrategy {
@Override
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final int numRelationships = relationshipList.size();
boolean foundFreeRelationship = false;
Relationship relationship = null;
// Getting set of available relationships only once. This may miss a relationship that recently became available, but
// overall is more efficient than re-calling for every relationship evaluation
Set<Relationship> availableRelationships = context.getAvailableRelationships();
int weightedIndex = 0;
while (!foundFreeRelationship) {
relationship = relationshipList.get(weightedIndex);
foundFreeRelationship = availableRelationships.contains(relationship);
if (++weightedIndex % numRelationships == 0 && !foundFreeRelationship) {
return null;
}
}
return relationship;
}
@Override
public boolean requiresAllDestinationsAvailable() {
return false;
}
}
}

View File

@ -123,7 +123,7 @@ public class TestDistributeLoad {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100");
testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE);
testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE.getValue());
for (int i = 0; i < 99; i++) {
testRunner.enqueue(new byte[0]);
@ -162,4 +162,48 @@ public class TestDistributeLoad {
assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
}
}
@Test
public void testOverflow() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "3");
testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_OVERFLOW.getValue());
// Queue all FlowFiles required for this test
for (int i = 0; i < 8; i++) {
testRunner.enqueue(new byte[0]);
}
// Repeatedly send to highest weighted relationship as long as it is available
testRunner.run(2, false);
testRunner.assertTransferCount("1", 2);
testRunner.assertTransferCount("2", 0);
testRunner.assertTransferCount("3", 0);
// When highest weighted relationship becomes unavailable, repeatedly send to next-highest weighted relationship
testRunner.clearTransferState();
testRunner.setRelationshipUnavailable("1");
testRunner.run(2, false);
testRunner.assertTransferCount("1", 0);
testRunner.assertTransferCount("2", 2);
testRunner.assertTransferCount("3", 0);
// Return to highest weighted relationship when it becomes available again
testRunner.clearTransferState();
testRunner.setRelationshipAvailable("1");
testRunner.run(2, false);
testRunner.assertTransferCount("1", 2);
testRunner.assertTransferCount("2", 0);
testRunner.assertTransferCount("3", 0);
// Skip ahead and repeatedly send to the first available relationship when multiple relationships are unavailable
testRunner.clearTransferState();
testRunner.setRelationshipUnavailable("1");
testRunner.setRelationshipUnavailable("2");
testRunner.run(2, false);
testRunner.assertTransferCount("1", 0);
testRunner.assertTransferCount("2", 0);
testRunner.assertTransferCount("3", 2);
}
}