From dbef536ebd2b8883de93fc2e0bf7574f7cc92006 Mon Sep 17 00:00:00 2001 From: Nissim Shiman Date: Fri, 20 Jan 2023 20:06:42 +0000 Subject: [PATCH] NIFI-10950 DistributeLoad processor - this closes #6924. removed Load Distribution Service Signed-off-by: Joe Witt --- .../nifi-standard-processors/pom.xml | 5 - .../processors/standard/DistributeLoad.java | 197 ++---------------- .../pom.xml | 30 --- .../loading/LoadDistributionListener.java | 24 --- .../nifi/loading/LoadDistributionService.java | 33 --- .../nifi-standard-services-api-nar/pom.xml | 5 - .../nifi-standard-services/pom.xml | 1 - nifi-nar-bundles/pom.xml | 6 - 8 files changed, 12 insertions(+), 289 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml delete mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java delete mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index b6a751eabf..51d63fd09f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -265,11 +265,6 @@ nifi-socket-utils 2.0.0-SNAPSHOT - - org.apache.nifi - nifi-load-distribution-service-api - 2.0.0-SNAPSHOT - javax.jms javax.jms-api diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index 1009846373..fd4ac28b4d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@ -17,19 +17,16 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DefaultRunDuration; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicRelationship; @@ -46,13 +43,10 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.loading.LoadDistributionListener; -import org.apache.nifi.loading.LoadDistributionService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -82,15 +76,12 @@ public class DistributeLoad extends AbstractProcessor { public static final String ROUND_ROBIN = "round robin"; public static final String NEXT_AVAILABLE = "next available"; - public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service"; public static final String OVERFLOW = "overflow"; public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN, "Relationship selection is evenly distributed in a round robin fashion; all relationships must be available."); public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE, "Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available."); - public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE, - "Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available."); public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW, "Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available."); @@ -107,44 +98,15 @@ public class DistributeLoad extends AbstractProcessor { .name("Distribution Strategy") .description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.") .required(true) - .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW) + .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_OVERFLOW) .defaultValue(ROUND_ROBIN) .build(); - public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder() - .name("Hostnames") - .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter") - .required(true) - .addValidator((subject, input, context) -> { - ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build(); - if (null == input) { - result = new ValidationResult.Builder().subject(subject).input(input).valid(false) - .explanation("Need to specify delimited list of FQDNs").build(); - return result; - } - String[] hostNames = input.split("(?:,+|;+|\\s+)"); - for (String hostName : hostNames) { - if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) { - result = new ValidationResult.Builder().subject(subject).input(input).valid(false) - .explanation("Need a FQDN rather than a simple host name.").build(); - return result; - } - } - return result; - }).build(); - public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder() - .name("Load Distribution Service ID") - .description("The identifier of the Load Distribution Service") - .required(true) - .identifiesControllerService(LoadDistributionService.class) - .build(); public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship"; private List properties; private final AtomicReference> relationshipsRef = new AtomicReference<>(); private final AtomicReference strategyRef = new AtomicReference<>(new RoundRobinStrategy()); private final AtomicReference> weightedRelationshipListRef = new AtomicReference<>(); - private final AtomicBoolean doCustomValidate = new AtomicBoolean(false); - private volatile LoadDistributionListener myListener; private final AtomicBoolean doSetProps = new AtomicBoolean(true); @Override @@ -185,9 +147,6 @@ public class DistributeLoad extends AbstractProcessor { case NEXT_AVAILABLE: strategyRef.set(new NextAvailableStrategy()); break; - case LOAD_DISTRIBUTION_SERVICE: - strategyRef.set(new LoadDistributionStrategy()); - break; case OVERFLOW: strategyRef.set(new OverflowStrategy()); break; @@ -195,18 +154,12 @@ public class DistributeLoad extends AbstractProcessor { throw new IllegalStateException("Invalid distribution strategy"); } doSetProps.set(true); - doCustomValidate.set(true); } } @Override protected List getSupportedPropertyDescriptors() { - if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) { - final List props = new ArrayList<>(properties); - props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); - props.add(HOSTNAMES); - this.properties = Collections.unmodifiableList(props); - } else if (doSetProps.getAndSet(false)) { + if (doSetProps.getAndSet(false)) { final List props = new ArrayList<>(); props.add(NUM_RELATIONSHIPS); props.add(DISTRIBUTION_STRATEGY); @@ -235,113 +188,22 @@ public class DistributeLoad extends AbstractProcessor { .name(propertyDescriptorName).dynamic(true).build(); } - @Override - protected Collection customValidate(ValidationContext validationContext) { - Collection results = new ArrayList<>(); - if (doCustomValidate.getAndSet(false)) { - String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue(); - if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) { - // make sure Hostnames and Controller service are set - PropertyValue propDesc = validationContext.getProperty(HOSTNAMES); - if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { - results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName()) - .explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build()); - } - propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); - if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { - results.add(new ValidationResult.Builder() - .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName()) - .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy") - .valid(false).build()); - } - if (results.isEmpty()) { - int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger(); - String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue(); - String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); - int numHosts = 0; - for (String hostName : hostNames) { - if (StringUtils.isNotBlank(hostName)) { - hostNames[numHosts++] = hostName; - } - } - if (numHosts > numRels) { - results.add(new ValidationResult.Builder() - .subject("Number of Relationships and Hostnames") - .explanation("Number of Relationships must be equal to, or greater than, the number of host names") - .valid(false).build()); - } else { - // create new relationships with descriptions of hostname - Set relsWithDesc = new TreeSet<>(); - for (int i = 0; i < numHosts; i++) { - relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)) - .description(hostNames[i]).build()); - } - // add add'l rels if configuration requires it...it probably shouldn't - for (int i = numHosts + 1; i <= numRels; i++) { - relsWithDesc.add(createRelationship(i)); - } - relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc)); - } - } - } - } - return results; - } - @OnScheduled public void createWeightedList(final ProcessContext context) { final Map weightings = new LinkedHashMap<>(); - String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue(); - if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) { - String hostNamesValue = context.getProperty(HOSTNAMES).getValue(); - String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); - Set hostNameSet = new HashSet<>(); - for (String hostName : hostNames) { - if (StringUtils.isNotBlank(hostName)) { - hostNameSet.add(hostName); - } - } - LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class); - myListener = new LoadDistributionListener() { - - @Override - public void update(Map loadInfo) { - for (Relationship rel : relationshipsRef.get()) { - String hostname = rel.getDescription(); - Integer weight = 1; - if (loadInfo.containsKey(hostname)) { - weight = loadInfo.get(hostname); - } - weightings.put(Integer.decode(rel.getName()), weight); - } - updateWeightedRelationships(weightings); - } - }; - - Map loadInfo = svc.getLoadDistribution(hostNameSet, myListener); - for (Relationship rel : relationshipsRef.get()) { - String hostname = rel.getDescription(); - Integer weight = 1; - if (loadInfo.containsKey(hostname)) { - weight = loadInfo.get(hostname); - } - weightings.put(Integer.decode(rel.getName()), weight); - } - - } else { - final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); - for (int i = 1; i <= numRelationships; i++) { - weightings.put(i, 1); - } - for (final PropertyDescriptor propDesc : context.getProperties().keySet()) { - if (!this.properties.contains(propDesc)) { - final int relationship = Integer.parseInt(propDesc.getName()); - final int weighting = context.getProperty(propDesc).asInteger(); - weightings.put(relationship, weighting); - } + final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); + for (int i = 1; i <= numRelationships; i++) { + weightings.put(i, 1); + } + for (final PropertyDescriptor propDesc : context.getProperties().keySet()) { + if (!this.properties.contains(propDesc)) { + final int relationship = Integer.parseInt(propDesc.getName()); + final int weighting = context.getProperty(propDesc).asInteger(); + weightings.put(relationship, weighting); } } + updateWeightedRelationships(weightings); } @@ -422,41 +284,6 @@ public class DistributeLoad extends AbstractProcessor { boolean requiresAllDestinationsAvailable(); } - private class LoadDistributionStrategy implements DistributionStrategy { - - private final AtomicLong counter = new AtomicLong(0L); - - @Override - public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { - final List relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); - final int numRelationships = relationshipList.size(); - - // create a HashSet that contains all of the available relationships, as calling #contains on HashSet - // is much faster than calling it on a List - boolean foundFreeRelationship = false; - Relationship relationship = null; - - int attempts = 0; - while (!foundFreeRelationship) { - final long counterValue = counter.getAndIncrement(); - final int idx = (int) (counterValue % numRelationships); - relationship = relationshipList.get(idx); - foundFreeRelationship = context.getAvailableRelationships().contains(relationship); - if (++attempts % numRelationships == 0 && !foundFreeRelationship) { - return null; - } - } - - return relationship; - } - - @Override - public boolean requiresAllDestinationsAvailable() { - return false; - } - - } - private class RoundRobinStrategy implements DistributionStrategy { private final AtomicLong counter = new AtomicLong(0L); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml deleted file mode 100644 index 2dcca6b49a..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml +++ /dev/null @@ -1,30 +0,0 @@ - - - 4.0.0 - - org.apache.nifi - nifi-standard-services - 2.0.0-SNAPSHOT - - nifi-load-distribution-service-api - jar - - - org.apache.nifi - nifi-api - - - diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java deleted file mode 100644 index 656bf99f1b..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.loading; - -import java.util.Map; - -public interface LoadDistributionListener { - - public void update(Map loadInfo); -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java deleted file mode 100644 index c413975e93..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.loading; - -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.controller.ControllerService; - -/** - * A service that will provide a Map of Fully Qualified Domain Names (fqdn) with - * their respective weights (scale of 1 - 100). - */ -public interface LoadDistributionService extends ControllerService { - - public Map getLoadDistribution(Set fqdns); - - public Map getLoadDistribution(Set fqdns, LoadDistributionListener listener); -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml index 13021001fb..b672f52661 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml @@ -47,11 +47,6 @@ nifi-distributed-cache-client-service-api compile - - org.apache.nifi - nifi-load-distribution-service-api - compile - org.apache.nifi nifi-http-context-map-api diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index fdbb3c38ff..260da7422c 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -27,7 +27,6 @@ nifi-oauth2-provider-bundle nifi-distributed-cache-client-service-api nifi-distributed-cache-services-bundle - nifi-load-distribution-service-api nifi-http-context-map-api nifi-lookup-service-api nifi-lookup-services-bundle diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 365bda49a6..68bfaee164 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -225,12 +225,6 @@ nifi-security-utils-api 2.0.0-SNAPSHOT provided - - - org.apache.nifi - nifi-load-distribution-service-api - 2.0.0-SNAPSHOT - provided org.apache.nifi