From d68720920fa10155eff81e1c1314a9286ba7a51c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 11 Mar 2020 09:41:56 -0400 Subject: [PATCH] NIFI-7242: When a Parameter is changed, any property referencing that parameter should have its #onPropertyModified method called. Also renamed Accumulo tests to integration tests because they start embedded servers and connect to them, which caused failures in my environment. Also fixed a bug in TestLengthDelimitedJournal because it was resulting in failures when building locally as well. Signed-off-by: Pierre Villard This closes #4134. --- .../expression/language/TestQuery.java | 5 + .../language/TestStandardPreparedQuery.java | 5 + .../nifi/parameter/ParameterLookup.java | 14 ++ .../nifi/wali/TestLengthDelimitedJournal.java | 13 +- .../{TestPutRecord.java => PutRecordIT.java} | 2 +- ...tScanAccumulo.java => ScanAccumuloIT.java} | 2 +- .../controller/AbstractComponentNode.java | 103 ++++++++++++++- .../apache/nifi/controller/ComponentNode.java | 7 + .../controller/PropertyConfiguration.java | 26 ++-- .../org/apache/nifi/groups/ProcessGroup.java | 5 +- .../nifi/parameter/ParameterContext.java | 9 +- .../nifi/parameter/ParameterUpdate.java | 39 ++++++ .../controller/TestAbstractComponentNode.java | 123 +++++++++++++++++- .../nifi/groups/StandardProcessGroup.java | 90 +++++++++++-- .../parameter/StandardParameterContext.java | 23 +++- .../parameter/StandardParameterUpdate.java | 83 ++++++++++++ .../service/mock/MockProcessGroup.java | 3 +- 17 files changed, 500 insertions(+), 52 deletions(-) rename nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/{TestPutRecord.java => PutRecordIT.java} (99%) rename nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/{TestScanAccumulo.java => ScanAccumuloIT.java} (99%) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterUpdate.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterUpdate.java diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java index d48129dfd4..7f17f1259e 100644 --- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java @@ -2228,5 +2228,10 @@ public class TestQuery { public boolean isEmpty() { return parameters.isEmpty(); } + + @Override + public long getVersion() { + return 0; + } } } diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java index 09c50d9eb9..546160d7b4 100644 --- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java @@ -252,6 +252,11 @@ public class TestStandardPreparedQuery { return Optional.ofNullable(parameters.get(parameterName)); } + @Override + public long getVersion() { + return 0; + } + @Override public boolean isEmpty() { return parameters.isEmpty(); diff --git a/nifi-commons/nifi-parameter/src/main/java/org/apache/nifi/parameter/ParameterLookup.java b/nifi-commons/nifi-parameter/src/main/java/org/apache/nifi/parameter/ParameterLookup.java index 3d97170050..1ec6790107 100644 --- a/nifi-commons/nifi-parameter/src/main/java/org/apache/nifi/parameter/ParameterLookup.java +++ b/nifi-commons/nifi-parameter/src/main/java/org/apache/nifi/parameter/ParameterLookup.java @@ -33,6 +33,15 @@ public interface ParameterLookup { */ boolean isEmpty(); + /** + * Indicates the current Version of the Parameter Context. Each time that the Parameter Context is updated, its version is incremented. This allows + * other components to know whether or not the values have changed since some other point in time. The version may or may not be persisted across + * restarts of the application. + * + * @return the current version + */ + long getVersion(); + ParameterLookup EMPTY = new ParameterLookup() { @Override @@ -40,6 +49,11 @@ public interface ParameterLookup { return Optional.empty(); } + @Override + public long getVersion() { + return 0; + } + @Override public boolean isEmpty() { return true; diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java index d54ba9c910..c3572f96ba 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java @@ -43,6 +43,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -480,12 +481,12 @@ public class TestLengthDelimitedJournal { // the BADOS will sleep for 1 second before writing. This allwos other threads to trigger corruption in the repo in the meantime. final ByteArrayDataOutputStream pausingBados = new ByteArrayDataOutputStream(4096) { private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - private int count = 0; + private final AtomicInteger count = new AtomicInteger(0); @Override public ByteArrayOutputStream getByteArrayOutputStream() { // Pause only on the second iteration. - if (count++ == 1) { + if (count.getAndIncrement() == 1) { try { Thread.sleep(1000L); } catch (final InterruptedException ie) { @@ -503,11 +504,11 @@ public class TestLengthDelimitedJournal { final Supplier badosSupplier = new Supplier() { - private int count = 0; + private final AtomicInteger count = new AtomicInteger(0); @Override public ByteArrayDataOutputStream get() { - if (count++ == 0) { + if (count.getAndIncrement() == 0) { return pausingBados; } @@ -525,11 +526,11 @@ public class TestLengthDelimitedJournal { final Thread[] threads = new Thread[2]; final LengthDelimitedJournal journal = new LengthDelimitedJournal(journalFile, serdeFactory, corruptingStreamPool, 0L) { - private int count = 0; + private final AtomicInteger count = new AtomicInteger(0); @Override protected void poison(final Throwable t) { - if (count++ == 0) { // it is only important that we sleep the first time. If we sleep every time, it just slows the test down. + if (count.getAndIncrement() == 0) { // it is only important that we sleep the first time. If we sleep every time, it just slows the test down. try { Thread.sleep(3000L); } catch (InterruptedException e) { diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java similarity index 99% rename from nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java rename to nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java index 2d45a486ea..c8bf47c3b8 100644 --- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java +++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java @@ -52,7 +52,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -public class TestPutRecord { +public class PutRecordIT { public static final String DEFAULT_COLUMN_FAMILY = "family1"; diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java similarity index 99% rename from nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java rename to nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java index 3be8c7282f..4abd8cbc8b 100644 --- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java +++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java @@ -54,7 +54,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -public class TestScanAccumulo { +public class ScanAccumuloIT { public static final String DEFAULT_COLUMN_FAMILY = "family1"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index adf410f088..a1f5af12a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -36,9 +36,12 @@ import org.apache.nifi.parameter.ExpressionLanguageAgnosticParameterParser; import org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser; import org.apache.nifi.parameter.Parameter; import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterDescriptor; +import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.parameter.ParameterParser; import org.apache.nifi.parameter.ParameterReference; import org.apache.nifi.parameter.ParameterTokenList; +import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; @@ -815,12 +818,110 @@ public abstract class AbstractComponentNode implements ComponentNode { } - private void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + protected void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) { getComponent().onPropertyModified(descriptor, oldValue, newValue); } } + @Override + public void onParametersModified(final Map updatedParameters) { + // If the component doesn't reference any parameters, then there's nothing to be done. + if (!isReferencingParameter()) { + return; + } + + final ParameterLookup previousParameterLookup = createParameterLookupForPreviousValues(updatedParameters); + + // For any Property that references an updated Parameter, we need to call onPropertyModified(). + // Additionally, we need to trigger validation to run if this component is affected by the parameter update. + boolean componentAffected = false; + for (final Map.Entry entry : properties.entrySet()) { + final PropertyDescriptor propertyDescriptor = entry.getKey(); + final PropertyConfiguration configuration = entry.getValue(); + + // Determine if this property is affected by the Parameter Update + boolean propertyAffected = false; + final List parameterReferences = configuration.getParameterReferences(); + for (final ParameterReference reference : parameterReferences) { + final String referencedParamName = reference.getParameterName(); + if (updatedParameters.containsKey(referencedParamName)) { + propertyAffected = true; + componentAffected = true; + break; + } + } + + if (propertyAffected) { + final String previousValue = configuration.getEffectiveValue(previousParameterLookup); + final String updatedValue = configuration.getEffectiveValue(getParameterContext()); + + // Check if the value of the property is truly affected. It's possible that we could have a property configured as something like "#{a}#{b}" + // Where parameter a = "abc-" and b = "cba". The update could change a to "abc" and b to "-cba". As a result, the property value previously was "abc-cba" and still is. + // In such a case, we should not call onPropertyModified. + final boolean propertyUpdated = !Objects.equals(previousValue, updatedValue); + if (propertyUpdated) { + try { + logger.debug("Parameter Context updated, resulting in property {} of {} changing. Calling onPropertyModified().", propertyDescriptor, this); + onPropertyModified(propertyDescriptor, previousValue, updatedValue); + } catch (final Exception e) { + // nothing really to do here... + logger.error("Failed to notify {} that property {} changed", this, propertyDescriptor, e); + } + } else { + logger.debug("Parameter Context updated, and property {} of {} does reference the updated Parameters. However, the overall property value remained unchanged so will not call " + + "onPropertyModified().", propertyDescriptor, this); + } + } + } + + // If this component is affected by the Parameter change, we need to re-validate + if (componentAffected) { + logger.debug("Configuration of {} changed due to an update to Parameter Context. Resetting validation state", this); + resetValidationState(); + } + } + + private ParameterLookup createParameterLookupForPreviousValues(final Map updatedParameters) { + final ParameterContext currentContext = getParameterContext(); + return new ParameterLookup() { + @Override + public Optional getParameter(final String parameterName) { + final Optional optionalParameter = currentContext == null ? Optional.empty() : currentContext.getParameter(parameterName); + + // Check if there's an update to the parameter. If not, just return the parameter as-is. + final ParameterUpdate parameterUpdate = updatedParameters.get(parameterName); + if (parameterUpdate == null) { + return optionalParameter; + } + + // There is an update to the parameter. We want to return the previous value of the Parameter. + final ParameterDescriptor parameterDescriptor; + if (optionalParameter.isPresent()) { + parameterDescriptor = optionalParameter.get().getDescriptor(); + } else { + parameterDescriptor = new ParameterDescriptor.Builder() + .name(parameterName) + .description("") + .sensitive(true) + .build(); + } + + final Parameter updatedParameter = new Parameter(parameterDescriptor, parameterUpdate.getPreviousValue()); + return Optional.of(updatedParameter); + } + + @Override + public boolean isEmpty() { + return (currentContext == null || currentContext.isEmpty()) && updatedParameters.isEmpty(); + } + + @Override + public long getVersion() { + return 0; + } + }; + } @Override public ValidationStatus getValidationStatus() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java index c8b779c45f..de7612a93c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java @@ -34,6 +34,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.validation.ValidationState; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.registry.ComponentVariableRegistry; import java.net.URL; @@ -71,6 +72,12 @@ public interface ComponentNode extends ComponentAuthorizable { boolean isReferencingParameter(); + /** + * Notifies the Component that the value of a parameter has changed + * @param parameterUpdates a Map of Parameter name to a ParameterUpdate that describes how the Parameter changed + */ + void onParametersModified(Map parameterUpdates); + /** *

* Pause triggering asynchronous validation to occur when the component is updated. Often times, it is necessary diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/PropertyConfiguration.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/PropertyConfiguration.java index 9121d02c30..d25a981281 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/PropertyConfiguration.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/PropertyConfiguration.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.controller; -import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.parameter.ParameterReference; import org.apache.nifi.parameter.ParameterTokenList; import org.apache.nifi.parameter.StandardParameterTokenList; @@ -44,7 +44,7 @@ public class PropertyConfiguration { return rawValue; } - public String getEffectiveValue(final ParameterContext parameterContext) { + public String getEffectiveValue(final ParameterLookup parameterLookup) { if (rawValue == null) { return null; } @@ -57,12 +57,12 @@ public class PropertyConfiguration { // cache the Effective Value because we may have a different Parameter Context. So, we cache a Tuple of // the Parameter Context and the effective value for that Parameter Context. final ComputedEffectiveValue computedEffectiveValue = effectiveValue.get(); - if (computedEffectiveValue != null && computedEffectiveValue.matches(parameterContext)) { + if (computedEffectiveValue != null && computedEffectiveValue.matches(parameterLookup)) { return computedEffectiveValue.getValue(); } - final String substituted = parameterTokenList.substitute(parameterContext); - final ComputedEffectiveValue updatedValue = new ComputedEffectiveValue(parameterContext, substituted); + final String substituted = parameterTokenList.substitute(parameterLookup); + final ComputedEffectiveValue updatedValue = new ComputedEffectiveValue(parameterLookup, substituted); effectiveValue.compareAndSet(computedEffectiveValue, updatedValue); return substituted; } @@ -96,13 +96,13 @@ public class PropertyConfiguration { public static class ComputedEffectiveValue { - private final ParameterContext parameterContext; + private final ParameterLookup parameterLookup; private final long contextVersion; private final String value; - public ComputedEffectiveValue(final ParameterContext parameterContext, final String value) { - this.parameterContext = parameterContext; - this.contextVersion = parameterContext == null ? -1 : parameterContext.getVersion(); + public ComputedEffectiveValue(final ParameterLookup parameterLookup, final String value) { + this.parameterLookup = parameterLookup; + this.contextVersion = parameterLookup == null ? -1 : parameterLookup.getVersion(); this.value = value; } @@ -110,16 +110,16 @@ public class PropertyConfiguration { return value; } - public boolean matches(final ParameterContext context) { - if (!Objects.equals(context, this.parameterContext)) { + public boolean matches(final ParameterLookup parameterLookup) { + if (!Objects.equals(parameterLookup, this.parameterLookup)) { return false; } - if (context == null) { + if (parameterLookup == null) { return true; } - return context.getVersion() == contextVersion; + return parameterLookup.getVersion() == contextVersion; } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 166977b94d..f4ec742d9a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -34,6 +34,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.processor.Processor; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; @@ -1043,6 +1044,8 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi /** * Called to notify the Process Group whenever the Parameter Context that it is bound to has changed. + * + * @param updatedParameters a Map of parameter name to the ParameterUpdate that describes how the Parameter was updated */ - void onParameterContextUpdated(); + void onParameterContextUpdated(Map updatedParameters); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterContext.java index f06132090f..9bfc63b5ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterContext.java @@ -91,12 +91,5 @@ public interface ParameterContext extends ParameterLookup, ComponentAuthorizable */ ParameterReferenceManager getParameterReferenceManager(); - /** - * Indicates the current Version of the Parameter Context. Each time that the Parameter Context is updated, its version is incremented. This allows - * other components to know whether or not the values have changed since some other point in time. The version may or may not be persisted across - * restarts of the application. - * - * @return the current version - */ - long getVersion(); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterUpdate.java new file mode 100644 index 0000000000..a5743f329d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/parameter/ParameterUpdate.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parameter; + +public interface ParameterUpdate { + /** + * @return the name of the Parameter + */ + String getParameterName(); + + /** + * @return the previous value of the Parameter, or null if the Parameter is being added or previously had no value + */ + String getPreviousValue(); + + /** + * @return the updated value of the Parameter, or null if the Parameter was removed or previously had no value + */ + String getUpdatedValue(); + + /** + * @return true if the Parameter is sensitive, false otherwise. + */ + boolean isSensitive(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java index 65a2471420..6ea143ee83 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java @@ -17,25 +17,33 @@ package org.apache.nifi.controller; -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.parameter.Parameter; import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterDescriptor; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.registry.ComponentVariableRegistry; import org.junit.Test; import org.mockito.Mockito; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -51,6 +59,52 @@ public class TestAbstractComponentNode { assertEquals(ValidationStatus.VALIDATING, status); } + public void testOnParametersModified() { + final AtomicLong validationCount = new AtomicLong(0L); + final ValidationTrigger validationTrigger = new ValidationTrigger() { + @Override + public void triggerAsync(ComponentNode component) { + validationCount.incrementAndGet(); + } + + @Override + public void trigger(ComponentNode component) { + validationCount.incrementAndGet(); + } + }; + + final List propertyModifications = new ArrayList<>(); + final ValidationControlledAbstractComponentNode node = new ValidationControlledAbstractComponentNode(0, validationTrigger) { + @Override + protected void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + propertyModifications.add(new PropertyModification(descriptor, oldValue, newValue)); + super.onPropertyModified(descriptor, oldValue, newValue); + } + }; + + final Map properties = new HashMap<>(); + properties.put("abc", "#{abc}"); + node.setProperties(properties); + + final ParameterContext context = Mockito.mock(ParameterContext.class); + final ParameterDescriptor paramDescriptor = new ParameterDescriptor.Builder() + .name("abc") + .description("") + .sensitive(false) + .build(); + final Parameter param = new Parameter(paramDescriptor, "123"); + Mockito.doReturn(param).when(context).getParameter("abc"); + + final Map updatedParameters = new HashMap<>(); + updatedParameters.put("abc", new MockParameterUpdate("abc", "xyz", "123", false)); + node.onParametersModified(updatedParameters); + + assertEquals(1, propertyModifications.size()); + final PropertyModification mod = propertyModifications.get(0); + assertEquals("xyz", mod.getPreviousValue()); + assertEquals("123", mod.getUpdatedValue()); + } + @Test(timeout = 10000) public void testValidationTriggerPaused() throws InterruptedException { final AtomicLong validationCount = new AtomicLong(0L); @@ -84,6 +138,7 @@ public class TestAbstractComponentNode { private static class ValidationControlledAbstractComponentNode extends AbstractComponentNode { private final long pauseMillis; + private volatile ParameterContext paramContext = null; public ValidationControlledAbstractComponentNode(final long pauseMillis, final ValidationTrigger validationTrigger) { super("id", Mockito.mock(ValidationContextFactory.class), Mockito.mock(ControllerServiceProvider.class), "unit test component", @@ -168,7 +223,69 @@ public class TestAbstractComponentNode { @Override protected ParameterContext getParameterContext() { - return null; + return paramContext; + } + + protected void setParameterContext(final ParameterContext parameterContext) { + this.paramContext = parameterContext; + } + } + + private static class PropertyModification { + private final PropertyDescriptor propertyDescriptor; + private final String previousValue; + private final String updatedValue; + + public PropertyModification(final PropertyDescriptor propertyDescriptor, final String previousValue, final String updatedValue) { + this.propertyDescriptor = propertyDescriptor; + this.previousValue = previousValue; + this.updatedValue = updatedValue; + } + + public PropertyDescriptor getPropertyDescriptor() { + return propertyDescriptor; + } + + public String getPreviousValue() { + return previousValue; + } + + public String getUpdatedValue() { + return updatedValue; + } + } + + private static class MockParameterUpdate implements ParameterUpdate { + private final String parameterName; + private final String oldValue; + private final String newValue; + private final boolean sensitive; + + public MockParameterUpdate(final String parameterName, final String oldValue, final String newValue, final boolean sensitive) { + this.parameterName = parameterName; + this.oldValue = oldValue; + this.newValue = newValue; + this.sensitive = sensitive; + } + + @Override + public String getParameterName() { + return parameterName; + } + + @Override + public String getPreviousValue() { + return oldValue; + } + + @Override + public String getUpdatedValue() { + return newValue; + } + + @Override + public boolean isSensitive() { + return sensitive; } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index f4af7fc27e..cf9045ff69 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -75,6 +75,8 @@ import org.apache.nifi.parameter.Parameter; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.parameter.ParameterReference; +import org.apache.nifi.parameter.ParameterUpdate; +import org.apache.nifi.parameter.StandardParameterUpdate; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.ComponentVariableRegistry; @@ -156,6 +158,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -2911,32 +2914,91 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void setParameterContext(final ParameterContext parameterContext) { verifyCanSetParameterContext(parameterContext); + + // Determine which parameters have changed so that components can be appropriately updated. + final Map updatedParameters = mapParameterUpdates(this.parameterContext, parameterContext); + LOG.debug("Parameter Context for {} changed from {} to {}. This resulted in {} Parameter Updates ({}). Notifying Processors/Controller Services of the updates.", + this, this.parameterContext, parameterContext, updatedParameters.size(), updatedParameters); + this.parameterContext = parameterContext; - getProcessors().forEach(ProcessorNode::resetValidationState); - getControllerServices(false).forEach(ControllerServiceNode::resetValidationState); + if (!updatedParameters.isEmpty()) { + // Notify components that parameters have been updated + onParameterContextUpdated(updatedParameters); + } } @Override - public void onParameterContextUpdated() { + public void onParameterContextUpdated(final Map updatedParameters) { readLock.lock(); try { - for (final ProcessorNode processorNode : getProcessors()) { - if (processorNode.isReferencingParameter() && processorNode.getScheduledState() != ScheduledState.RUNNING) { - processorNode.resetValidationState(); - } - } - - for (final ControllerServiceNode serviceNode : getControllerServices(false)) { - if (serviceNode.isReferencingParameter() && serviceNode.getState() == ControllerServiceState.DISABLING || serviceNode.getState() == ControllerServiceState.DISABLED) { - serviceNode.resetValidationState(); - } - } + getProcessors().forEach(proc -> proc.onParametersModified(updatedParameters)); + getControllerServices(false).forEach(cs -> cs.onParametersModified(updatedParameters)); } finally { readLock.unlock(); } } + private Map mapParameterUpdates(final ParameterContext previousParameterContext, final ParameterContext updatedParameterContext) { + if (previousParameterContext == null && updatedParameterContext == null) { + return Collections.emptyMap(); + } + if (updatedParameterContext == null) { + return createParameterUpdates(previousParameterContext, (descriptor, value) -> new StandardParameterUpdate(descriptor.getName(), value, null, descriptor.isSensitive())); + } + if (previousParameterContext == null) { + return createParameterUpdates(updatedParameterContext, (descriptor, value) -> new StandardParameterUpdate(descriptor.getName(), null, value, descriptor.isSensitive())); + } + + // For each Parameter in the updated parameter context, add a ParameterUpdate to our map + final Map updatedParameters = new HashMap<>(); + for (final Map.Entry entry : updatedParameterContext.getParameters().entrySet()) { + final ParameterDescriptor updatedDescriptor = entry.getKey(); + final Parameter updatedParameter = entry.getValue(); + + final Optional previousParameterOption = previousParameterContext.getParameter(updatedDescriptor); + final String previousValue = previousParameterOption.map(Parameter::getValue).orElse(null); + final String updatedValue = updatedParameter.getValue(); + + if (!Objects.equals(previousValue, updatedValue)) { + final ParameterUpdate parameterUpdate = new StandardParameterUpdate(updatedDescriptor.getName(), previousValue, updatedValue, updatedDescriptor.isSensitive()); + updatedParameters.put(updatedDescriptor.getName(), parameterUpdate); + } + } + + // For each Parameter that was in the previous parameter context that is not in the updated Paramter Context, add a ParameterUpdate to our map with `null` for the updated value + for (final Map.Entry entry : previousParameterContext.getParameters().entrySet()) { + final ParameterDescriptor previousDescriptor = entry.getKey(); + final Parameter previousParameter = entry.getValue(); + + final Optional updatedParameterOption = updatedParameterContext.getParameter(previousDescriptor); + if (updatedParameterOption.isPresent()) { + // The value exists in both Parameter Contexts. If it was changed, a Parameter Update has already been added to the map, above. + continue; + } + + final ParameterUpdate parameterUpdate = new StandardParameterUpdate(previousDescriptor.getName(), previousParameter.getValue(), null, previousDescriptor.isSensitive()); + updatedParameters.put(previousDescriptor.getName(), parameterUpdate); + } + + return updatedParameters; + } + + private Map createParameterUpdates(final ParameterContext parameterContext, final BiFunction parameterUpdateMapper) { + final Map updatedParameters = new HashMap<>(); + + for (final Map.Entry entry : parameterContext.getParameters().entrySet()) { + final ParameterDescriptor parameterDescriptor = entry.getKey(); + final Parameter parameter = entry.getValue(); + + final ParameterUpdate parameterUpdate = parameterUpdateMapper.apply(parameterDescriptor, parameter.getValue()); + updatedParameters.put(parameterDescriptor.getName(), parameterUpdate); + } + + return updatedParameters; + } + + @Override public void verifyCanSetParameterContext(final ParameterContext parameterContext) { readLock.lock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java index 7af68fc265..b6cae2655a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java @@ -30,6 +30,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -107,33 +108,44 @@ public class StandardParameterContext implements ParameterContext { this.version++; verifyCanSetParameters(updatedParameters); + final Map parameterUpdates = new HashMap<>(); boolean changeAffectingComponents = false; for (final Map.Entry entry : updatedParameters.entrySet()) { final String parameterName = entry.getKey(); final Parameter parameter = entry.getValue(); if (parameter == null) { - final ParameterDescriptor parameterDescriptor = new ParameterDescriptor.Builder().name(parameterName).build(); - parameters.remove(parameterDescriptor); changeAffectingComponents = true; + + final ParameterDescriptor parameterDescriptor = new ParameterDescriptor.Builder().name(parameterName).build(); + final Parameter oldParameter = parameters.remove(parameterDescriptor); + + parameterUpdates.put(parameterName, new StandardParameterUpdate(parameterName, oldParameter.getValue(), null, parameterDescriptor.isSensitive())); } else { final Parameter updatedParameter = createFullyPopulatedParameter(parameter); final Parameter oldParameter = parameters.put(updatedParameter.getDescriptor(), updatedParameter); if (oldParameter == null || !Objects.equals(oldParameter.getValue(), updatedParameter.getValue())) { changeAffectingComponents = true; + + final String previousValue = oldParameter == null ? null : oldParameter.getValue(); + parameterUpdates.put(parameterName, new StandardParameterUpdate(parameterName, previousValue, updatedParameter.getValue(), updatedParameter.getDescriptor().isSensitive())); } } } if (changeAffectingComponents) { + logger.debug("Parameter Context {} was updated. {} parameters changed ({}). Notifying all affected components.", this, parameterUpdates.size(), parameterUpdates); + for (final ProcessGroup processGroup : parameterReferenceManager.getProcessGroupsBound(this)) { try { - processGroup.onParameterContextUpdated(); + processGroup.onParameterContextUpdated(parameterUpdates); } catch (final Exception e) { logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e); } } + } else { + logger.debug("Parameter Context {} was updated. {} parameters changed ({}). No existing components are affected.", this, parameterUpdates.size(), parameterUpdates); } } finally { writeLock.unlock(); @@ -324,6 +336,11 @@ public class StandardParameterContext implements ParameterContext { } } + @Override + public String toString() { + return "StandardParameterContext[name=" + name + "]"; + } + @Override public Authorizable getParentAuthorizable() { return new Authorizable() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterUpdate.java new file mode 100644 index 0000000000..afb963a69d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/parameter/StandardParameterUpdate.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parameter; + +import java.util.Objects; + +public class StandardParameterUpdate implements ParameterUpdate { + private final String parameterName; + private final String previousValue; + private final String updatedValue; + private final boolean sensitiveParameter; + + public StandardParameterUpdate(final String parameterName, final String previousValue, final String updatedValue, final boolean sensitiveParameter) { + this.parameterName = parameterName; + this.previousValue = previousValue; + this.updatedValue = updatedValue; + this.sensitiveParameter = sensitiveParameter; + } + + @Override + public String getParameterName() { + return parameterName; + } + + @Override + public String getPreviousValue() { + return previousValue; + } + + @Override + public String getUpdatedValue() { + return updatedValue; + } + + @Override + public boolean isSensitive() { + return sensitiveParameter; + } + + @Override + public String toString() { + if (sensitiveParameter) { + return "StandardParameterUpdate[parameterName=" + parameterName + ", sensitive=true]"; + } else { + return "StandardParameterUpdate[parameterName=" + parameterName + ", sensitive=false, previous value='" + previousValue + "', updated value='" + updatedValue + "']"; + } + } + + @Override + public boolean equals(final Object o) { + if (this == o){ + return true; + } + + if (!(o instanceof StandardParameterUpdate)) { + return false; + } + + final StandardParameterUpdate that = (StandardParameterUpdate) o; + return Objects.equals(parameterName, that.parameterName) + && Objects.equals(previousValue, that.previousValue) + && Objects.equals(updatedValue, that.updatedValue); + } + + @Override + public int hashCode() { + return Objects.hash(parameterName, previousValue, updatedValue); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 30e17f16db..d1dc32c8da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -36,6 +36,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; @@ -710,7 +711,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void onParameterContextUpdated() { + public void onParameterContextUpdated(final Map updatedParameters) { } @Override