NIFI-10950 DistributeLoad processor - this closes #6924. removed Load Distribution

Service

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Nissim Shiman 2023-01-20 20:06:42 +00:00 committed by Joe Witt
parent 471cccbbda
commit dbef536ebd
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
8 changed files with 12 additions and 289 deletions

View File

@ -265,11 +265,6 @@
<artifactId>nifi-socket-utils</artifactId> <artifactId>nifi-socket-utils</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-load-distribution-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>javax.jms</groupId> <groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId> <artifactId>javax.jms-api</artifactId>

View File

@ -17,19 +17,16 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DefaultRunDuration; import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship; import org.apache.nifi.annotation.behavior.DynamicRelationship;
@ -46,13 +43,10 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.loading.LoadDistributionListener;
import org.apache.nifi.loading.LoadDistributionService;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -82,15 +76,12 @@ public class DistributeLoad extends AbstractProcessor {
public static final String ROUND_ROBIN = "round robin"; public static final String ROUND_ROBIN = "round robin";
public static final String NEXT_AVAILABLE = "next available"; public static final String NEXT_AVAILABLE = "next available";
public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service";
public static final String OVERFLOW = "overflow"; public static final String OVERFLOW = "overflow";
public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN, public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN,
"Relationship selection is evenly distributed in a round robin fashion; all relationships must be available."); "Relationship selection is evenly distributed in a round robin fashion; all relationships must be available.");
public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE, public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE,
"Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available."); "Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available.");
public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE,
"Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available.");
public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW, public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW,
"Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available."); "Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available.");
@ -107,44 +98,15 @@ public class DistributeLoad extends AbstractProcessor {
.name("Distribution Strategy") .name("Distribution Strategy")
.description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.") .description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.")
.required(true) .required(true)
.allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW) .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_OVERFLOW)
.defaultValue(ROUND_ROBIN) .defaultValue(ROUND_ROBIN)
.build(); .build();
public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
.name("Hostnames")
.description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
.required(true)
.addValidator((subject, input, context) -> {
ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
if (null == input) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need to specify delimited list of FQDNs").build();
return result;
}
String[] hostNames = input.split("(?:,+|;+|\\s+)");
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
.explanation("Need a FQDN rather than a simple host name.").build();
return result;
}
}
return result;
}).build();
public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
.name("Load Distribution Service ID")
.description("The identifier of the Load Distribution Service")
.required(true)
.identifiesControllerService(LoadDistributionService.class)
.build();
public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship"; public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>(); private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<>(new RoundRobinStrategy()); private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<>(new RoundRobinStrategy());
private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>(); private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
private volatile LoadDistributionListener myListener;
private final AtomicBoolean doSetProps = new AtomicBoolean(true); private final AtomicBoolean doSetProps = new AtomicBoolean(true);
@Override @Override
@ -185,9 +147,6 @@ public class DistributeLoad extends AbstractProcessor {
case NEXT_AVAILABLE: case NEXT_AVAILABLE:
strategyRef.set(new NextAvailableStrategy()); strategyRef.set(new NextAvailableStrategy());
break; break;
case LOAD_DISTRIBUTION_SERVICE:
strategyRef.set(new LoadDistributionStrategy());
break;
case OVERFLOW: case OVERFLOW:
strategyRef.set(new OverflowStrategy()); strategyRef.set(new OverflowStrategy());
break; break;
@ -195,18 +154,12 @@ public class DistributeLoad extends AbstractProcessor {
throw new IllegalStateException("Invalid distribution strategy"); throw new IllegalStateException("Invalid distribution strategy");
} }
doSetProps.set(true); doSetProps.set(true);
doCustomValidate.set(true);
} }
} }
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) { if (doSetProps.getAndSet(false)) {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
props.add(HOSTNAMES);
this.properties = Collections.unmodifiableList(props);
} else if (doSetProps.getAndSet(false)) {
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
props.add(NUM_RELATIONSHIPS); props.add(NUM_RELATIONSHIPS);
props.add(DISTRIBUTION_STRATEGY); props.add(DISTRIBUTION_STRATEGY);
@ -235,113 +188,22 @@ public class DistributeLoad extends AbstractProcessor {
.name(propertyDescriptorName).dynamic(true).build(); .name(propertyDescriptorName).dynamic(true).build();
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Collection<ValidationResult> results = new ArrayList<>();
if (doCustomValidate.getAndSet(false)) {
String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
// make sure Hostnames and Controller service are set
PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName())
.explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build());
}
propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
.explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy")
.valid(false).build());
}
if (results.isEmpty()) {
int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
int numHosts = 0;
for (String hostName : hostNames) {
if (StringUtils.isNotBlank(hostName)) {
hostNames[numHosts++] = hostName;
}
}
if (numHosts > numRels) {
results.add(new ValidationResult.Builder()
.subject("Number of Relationships and Hostnames")
.explanation("Number of Relationships must be equal to, or greater than, the number of host names")
.valid(false).build());
} else {
// create new relationships with descriptions of hostname
Set<Relationship> relsWithDesc = new TreeSet<>();
for (int i = 0; i < numHosts; i++) {
relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1))
.description(hostNames[i]).build());
}
// add add'l rels if configuration requires it...it probably shouldn't
for (int i = numHosts + 1; i <= numRels; i++) {
relsWithDesc.add(createRelationship(i));
}
relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
}
}
}
}
return results;
}
@OnScheduled @OnScheduled
public void createWeightedList(final ProcessContext context) { public void createWeightedList(final ProcessContext context) {
final Map<Integer, Integer> weightings = new LinkedHashMap<>(); final Map<Integer, Integer> weightings = new LinkedHashMap<>();
String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue(); final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) { for (int i = 1; i <= numRelationships; i++) {
String hostNamesValue = context.getProperty(HOSTNAMES).getValue(); weightings.put(i, 1);
String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); }
Set<String> hostNameSet = new HashSet<>(); for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
for (String hostName : hostNames) { if (!this.properties.contains(propDesc)) {
if (StringUtils.isNotBlank(hostName)) { final int relationship = Integer.parseInt(propDesc.getName());
hostNameSet.add(hostName); final int weighting = context.getProperty(propDesc).asInteger();
} weightings.put(relationship, weighting);
}
LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
myListener = new LoadDistributionListener() {
@Override
public void update(Map<String, Integer> loadInfo) {
for (Relationship rel : relationshipsRef.get()) {
String hostname = rel.getDescription();
Integer weight = 1;
if (loadInfo.containsKey(hostname)) {
weight = loadInfo.get(hostname);
}
weightings.put(Integer.decode(rel.getName()), weight);
}
updateWeightedRelationships(weightings);
}
};
Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener);
for (Relationship rel : relationshipsRef.get()) {
String hostname = rel.getDescription();
Integer weight = 1;
if (loadInfo.containsKey(hostname)) {
weight = loadInfo.get(hostname);
}
weightings.put(Integer.decode(rel.getName()), weight);
}
} else {
final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
for (int i = 1; i <= numRelationships; i++) {
weightings.put(i, 1);
}
for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
if (!this.properties.contains(propDesc)) {
final int relationship = Integer.parseInt(propDesc.getName());
final int weighting = context.getProperty(propDesc).asInteger();
weightings.put(relationship, weighting);
}
} }
} }
updateWeightedRelationships(weightings); updateWeightedRelationships(weightings);
} }
@ -422,41 +284,6 @@ public class DistributeLoad extends AbstractProcessor {
boolean requiresAllDestinationsAvailable(); boolean requiresAllDestinationsAvailable();
} }
private class LoadDistributionStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L);
@Override
public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
final int numRelationships = relationshipList.size();
// create a HashSet that contains all of the available relationships, as calling #contains on HashSet
// is much faster than calling it on a List
boolean foundFreeRelationship = false;
Relationship relationship = null;
int attempts = 0;
while (!foundFreeRelationship) {
final long counterValue = counter.getAndIncrement();
final int idx = (int) (counterValue % numRelationships);
relationship = relationshipList.get(idx);
foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
return null;
}
}
return relationship;
}
@Override
public boolean requiresAllDestinationsAvailable() {
return false;
}
}
private class RoundRobinStrategy implements DistributionStrategy { private class RoundRobinStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L); private final AtomicLong counter = new AtomicLong(0L);

View File

@ -1,30 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-load-distribution-service-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.loading;
import java.util.Map;
public interface LoadDistributionListener {
public void update(Map<String, Integer> loadInfo);
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.loading;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.controller.ControllerService;
/**
* A service that will provide a Map of Fully Qualified Domain Names (fqdn) with
* their respective weights (scale of 1 - 100).
*/
public interface LoadDistributionService extends ControllerService {
public Map<String, Integer> getLoadDistribution(Set<String> fqdns);
public Map<String, Integer> getLoadDistribution(Set<String> fqdns, LoadDistributionListener listener);
}

View File

@ -47,11 +47,6 @@
<artifactId>nifi-distributed-cache-client-service-api</artifactId> <artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-load-distribution-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId> <artifactId>nifi-http-context-map-api</artifactId>

View File

@ -27,7 +27,6 @@
<module>nifi-oauth2-provider-bundle</module> <module>nifi-oauth2-provider-bundle</module>
<module>nifi-distributed-cache-client-service-api</module> <module>nifi-distributed-cache-client-service-api</module>
<module>nifi-distributed-cache-services-bundle</module> <module>nifi-distributed-cache-services-bundle</module>
<module>nifi-load-distribution-service-api</module>
<module>nifi-http-context-map-api</module> <module>nifi-http-context-map-api</module>
<module>nifi-lookup-service-api</module> <module>nifi-lookup-service-api</module>
<module>nifi-lookup-services-bundle</module> <module>nifi-lookup-services-bundle</module>

View File

@ -225,12 +225,6 @@
<artifactId>nifi-security-utils-api</artifactId> <artifactId>nifi-security-utils-api</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-load-distribution-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>