diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index b6a751eabf..51d63fd09f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -265,11 +265,6 @@
nifi-socket-utils
2.0.0-SNAPSHOT
-
- org.apache.nifi
- nifi-load-distribution-service-api
- 2.0.0-SNAPSHOT
-
javax.jms
javax.jms-api
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 1009846373..fd4ac28b4d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -17,19 +17,16 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
@@ -46,13 +43,10 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.loading.LoadDistributionListener;
-import org.apache.nifi.loading.LoadDistributionService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -82,15 +76,12 @@ public class DistributeLoad extends AbstractProcessor {
public static final String ROUND_ROBIN = "round robin";
public static final String NEXT_AVAILABLE = "next available";
- public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service";
public static final String OVERFLOW = "overflow";
public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN,
"Relationship selection is evenly distributed in a round robin fashion; all relationships must be available.");
public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE,
"Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available.");
- public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE,
- "Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available.");
public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW,
"Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available.");
@@ -107,44 +98,15 @@ public class DistributeLoad extends AbstractProcessor {
.name("Distribution Strategy")
.description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.")
.required(true)
- .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW)
+ .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_OVERFLOW)
.defaultValue(ROUND_ROBIN)
.build();
- public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
- .name("Hostnames")
- .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
- .required(true)
- .addValidator((subject, input, context) -> {
- ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
- if (null == input) {
- result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
- .explanation("Need to specify delimited list of FQDNs").build();
- return result;
- }
- String[] hostNames = input.split("(?:,+|;+|\\s+)");
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
- result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
- .explanation("Need a FQDN rather than a simple host name.").build();
- return result;
- }
- }
- return result;
- }).build();
- public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
- .name("Load Distribution Service ID")
- .description("The identifier of the Load Distribution Service")
- .required(true)
- .identifiesControllerService(LoadDistributionService.class)
- .build();
public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
private List properties;
private final AtomicReference> relationshipsRef = new AtomicReference<>();
private final AtomicReference strategyRef = new AtomicReference<>(new RoundRobinStrategy());
private final AtomicReference> weightedRelationshipListRef = new AtomicReference<>();
- private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
- private volatile LoadDistributionListener myListener;
private final AtomicBoolean doSetProps = new AtomicBoolean(true);
@Override
@@ -185,9 +147,6 @@ public class DistributeLoad extends AbstractProcessor {
case NEXT_AVAILABLE:
strategyRef.set(new NextAvailableStrategy());
break;
- case LOAD_DISTRIBUTION_SERVICE:
- strategyRef.set(new LoadDistributionStrategy());
- break;
case OVERFLOW:
strategyRef.set(new OverflowStrategy());
break;
@@ -195,18 +154,12 @@ public class DistributeLoad extends AbstractProcessor {
throw new IllegalStateException("Invalid distribution strategy");
}
doSetProps.set(true);
- doCustomValidate.set(true);
}
}
@Override
protected List getSupportedPropertyDescriptors() {
- if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) {
- final List props = new ArrayList<>(properties);
- props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
- props.add(HOSTNAMES);
- this.properties = Collections.unmodifiableList(props);
- } else if (doSetProps.getAndSet(false)) {
+ if (doSetProps.getAndSet(false)) {
final List props = new ArrayList<>();
props.add(NUM_RELATIONSHIPS);
props.add(DISTRIBUTION_STRATEGY);
@@ -235,113 +188,22 @@ public class DistributeLoad extends AbstractProcessor {
.name(propertyDescriptorName).dynamic(true).build();
}
- @Override
- protected Collection customValidate(ValidationContext validationContext) {
- Collection results = new ArrayList<>();
- if (doCustomValidate.getAndSet(false)) {
- String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
- if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
- // make sure Hostnames and Controller service are set
- PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
- if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
- results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName())
- .explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build());
- }
- propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
- if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
- results.add(new ValidationResult.Builder()
- .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
- .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy")
- .valid(false).build());
- }
- if (results.isEmpty()) {
- int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
- String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
- String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
- int numHosts = 0;
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName)) {
- hostNames[numHosts++] = hostName;
- }
- }
- if (numHosts > numRels) {
- results.add(new ValidationResult.Builder()
- .subject("Number of Relationships and Hostnames")
- .explanation("Number of Relationships must be equal to, or greater than, the number of host names")
- .valid(false).build());
- } else {
- // create new relationships with descriptions of hostname
- Set relsWithDesc = new TreeSet<>();
- for (int i = 0; i < numHosts; i++) {
- relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1))
- .description(hostNames[i]).build());
- }
- // add add'l rels if configuration requires it...it probably shouldn't
- for (int i = numHosts + 1; i <= numRels; i++) {
- relsWithDesc.add(createRelationship(i));
- }
- relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
- }
- }
- }
- }
- return results;
- }
-
@OnScheduled
public void createWeightedList(final ProcessContext context) {
final Map weightings = new LinkedHashMap<>();
- String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
- if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
- String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
- String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
- Set hostNameSet = new HashSet<>();
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName)) {
- hostNameSet.add(hostName);
- }
- }
- LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
- myListener = new LoadDistributionListener() {
-
- @Override
- public void update(Map loadInfo) {
- for (Relationship rel : relationshipsRef.get()) {
- String hostname = rel.getDescription();
- Integer weight = 1;
- if (loadInfo.containsKey(hostname)) {
- weight = loadInfo.get(hostname);
- }
- weightings.put(Integer.decode(rel.getName()), weight);
- }
- updateWeightedRelationships(weightings);
- }
- };
-
- Map loadInfo = svc.getLoadDistribution(hostNameSet, myListener);
- for (Relationship rel : relationshipsRef.get()) {
- String hostname = rel.getDescription();
- Integer weight = 1;
- if (loadInfo.containsKey(hostname)) {
- weight = loadInfo.get(hostname);
- }
- weightings.put(Integer.decode(rel.getName()), weight);
- }
-
- } else {
- final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
- for (int i = 1; i <= numRelationships; i++) {
- weightings.put(i, 1);
- }
- for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
- if (!this.properties.contains(propDesc)) {
- final int relationship = Integer.parseInt(propDesc.getName());
- final int weighting = context.getProperty(propDesc).asInteger();
- weightings.put(relationship, weighting);
- }
+ final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
+ for (int i = 1; i <= numRelationships; i++) {
+ weightings.put(i, 1);
+ }
+ for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
+ if (!this.properties.contains(propDesc)) {
+ final int relationship = Integer.parseInt(propDesc.getName());
+ final int weighting = context.getProperty(propDesc).asInteger();
+ weightings.put(relationship, weighting);
}
}
+
updateWeightedRelationships(weightings);
}
@@ -422,41 +284,6 @@ public class DistributeLoad extends AbstractProcessor {
boolean requiresAllDestinationsAvailable();
}
- private class LoadDistributionStrategy implements DistributionStrategy {
-
- private final AtomicLong counter = new AtomicLong(0L);
-
- @Override
- public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
- final List relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
- final int numRelationships = relationshipList.size();
-
- // create a HashSet that contains all of the available relationships, as calling #contains on HashSet
- // is much faster than calling it on a List
- boolean foundFreeRelationship = false;
- Relationship relationship = null;
-
- int attempts = 0;
- while (!foundFreeRelationship) {
- final long counterValue = counter.getAndIncrement();
- final int idx = (int) (counterValue % numRelationships);
- relationship = relationshipList.get(idx);
- foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
- if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
- return null;
- }
- }
-
- return relationship;
- }
-
- @Override
- public boolean requiresAllDestinationsAvailable() {
- return false;
- }
-
- }
-
private class RoundRobinStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml
deleted file mode 100644
index 2dcca6b49a..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-
-
- 4.0.0
-
- org.apache.nifi
- nifi-standard-services
- 2.0.0-SNAPSHOT
-
- nifi-load-distribution-service-api
- jar
-
-
- org.apache.nifi
- nifi-api
-
-
-
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java
deleted file mode 100644
index 656bf99f1b..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.loading;
-
-import java.util.Map;
-
-public interface LoadDistributionListener {
-
- public void update(Map loadInfo);
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java
deleted file mode 100644
index c413975e93..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.loading;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.controller.ControllerService;
-
-/**
- * A service that will provide a Map of Fully Qualified Domain Names (fqdn) with
- * their respective weights (scale of 1 - 100).
- */
-public interface LoadDistributionService extends ControllerService {
-
- public Map getLoadDistribution(Set fqdns);
-
- public Map getLoadDistribution(Set fqdns, LoadDistributionListener listener);
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
index 13021001fb..b672f52661 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
@@ -47,11 +47,6 @@
nifi-distributed-cache-client-service-api
compile
-
- org.apache.nifi
- nifi-load-distribution-service-api
- compile
-
org.apache.nifi
nifi-http-context-map-api
diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml
index fdbb3c38ff..260da7422c 100644
--- a/nifi-nar-bundles/nifi-standard-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/pom.xml
@@ -27,7 +27,6 @@
nifi-oauth2-provider-bundle
nifi-distributed-cache-client-service-api
nifi-distributed-cache-services-bundle
- nifi-load-distribution-service-api
nifi-http-context-map-api
nifi-lookup-service-api
nifi-lookup-services-bundle
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 365bda49a6..68bfaee164 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -225,12 +225,6 @@
nifi-security-utils-api
2.0.0-SNAPSHOT
provided
-
-
- org.apache.nifi
- nifi-load-distribution-service-api
- 2.0.0-SNAPSHOT
- provided
org.apache.nifi