NIFI-7242: When a Parameter is changed, any property referencing that parameter should have its #onPropertyModified method called. Also renamed Accumulo tests to integration tests because they start embedded servers and connect to them, which caused failures in my environment. Also fixed a bug in TestLengthDelimitedJournal because it was resulting in failures when building locally as well.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4134.
This commit is contained in:
Mark Payne 2020-03-11 09:41:56 -04:00 committed by Pierre Villard
parent 67676ba5b4
commit d68720920f
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
17 changed files with 500 additions and 52 deletions

View File

@ -2228,5 +2228,10 @@ public class TestQuery {
public boolean isEmpty() { public boolean isEmpty() {
return parameters.isEmpty(); return parameters.isEmpty();
} }
@Override
public long getVersion() {
return 0;
}
} }
} }

View File

@ -252,6 +252,11 @@ public class TestStandardPreparedQuery {
return Optional.ofNullable(parameters.get(parameterName)); return Optional.ofNullable(parameters.get(parameterName));
} }
@Override
public long getVersion() {
return 0;
}
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
return parameters.isEmpty(); return parameters.isEmpty();

View File

@ -33,6 +33,15 @@ public interface ParameterLookup {
*/ */
boolean isEmpty(); boolean isEmpty();
/**
* Indicates the current Version of the Parameter Context. Each time that the Parameter Context is updated, its version is incremented. This allows
* other components to know whether or not the values have changed since some other point in time. The version may or may not be persisted across
* restarts of the application.
*
* @return the current version
*/
long getVersion();
ParameterLookup EMPTY = new ParameterLookup() { ParameterLookup EMPTY = new ParameterLookup() {
@Override @Override
@ -40,6 +49,11 @@ public interface ParameterLookup {
return Optional.empty(); return Optional.empty();
} }
@Override
public long getVersion() {
return 0;
}
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
return true; return true;

View File

@ -43,6 +43,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -480,12 +481,12 @@ public class TestLengthDelimitedJournal {
// the BADOS will sleep for 1 second before writing. This allwos other threads to trigger corruption in the repo in the meantime. // the BADOS will sleep for 1 second before writing. This allwos other threads to trigger corruption in the repo in the meantime.
final ByteArrayDataOutputStream pausingBados = new ByteArrayDataOutputStream(4096) { final ByteArrayDataOutputStream pausingBados = new ByteArrayDataOutputStream(4096) {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private int count = 0; private final AtomicInteger count = new AtomicInteger(0);
@Override @Override
public ByteArrayOutputStream getByteArrayOutputStream() { public ByteArrayOutputStream getByteArrayOutputStream() {
// Pause only on the second iteration. // Pause only on the second iteration.
if (count++ == 1) { if (count.getAndIncrement() == 1) {
try { try {
Thread.sleep(1000L); Thread.sleep(1000L);
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
@ -503,11 +504,11 @@ public class TestLengthDelimitedJournal {
final Supplier<ByteArrayDataOutputStream> badosSupplier = new Supplier<ByteArrayDataOutputStream>() { final Supplier<ByteArrayDataOutputStream> badosSupplier = new Supplier<ByteArrayDataOutputStream>() {
private int count = 0; private final AtomicInteger count = new AtomicInteger(0);
@Override @Override
public ByteArrayDataOutputStream get() { public ByteArrayDataOutputStream get() {
if (count++ == 0) { if (count.getAndIncrement() == 0) {
return pausingBados; return pausingBados;
} }
@ -525,11 +526,11 @@ public class TestLengthDelimitedJournal {
final Thread[] threads = new Thread[2]; final Thread[] threads = new Thread[2];
final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, corruptingStreamPool, 0L) { final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, corruptingStreamPool, 0L) {
private int count = 0; private final AtomicInteger count = new AtomicInteger(0);
@Override @Override
protected void poison(final Throwable t) { protected void poison(final Throwable t) {
if (count++ == 0) { // it is only important that we sleep the first time. If we sleep every time, it just slows the test down. if (count.getAndIncrement() == 0) { // it is only important that we sleep the first time. If we sleep every time, it just slows the test down.
try { try {
Thread.sleep(3000L); Thread.sleep(3000L);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -52,7 +52,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
public class TestPutRecord { public class PutRecordIT {
public static final String DEFAULT_COLUMN_FAMILY = "family1"; public static final String DEFAULT_COLUMN_FAMILY = "family1";

View File

@ -54,7 +54,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
public class TestScanAccumulo { public class ScanAccumuloIT {
public static final String DEFAULT_COLUMN_FAMILY = "family1"; public static final String DEFAULT_COLUMN_FAMILY = "family1";

View File

@ -36,9 +36,12 @@ import org.apache.nifi.parameter.ExpressionLanguageAgnosticParameterParser;
import org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser; import org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser;
import org.apache.nifi.parameter.Parameter; import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterParser; import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterReference; import org.apache.nifi.parameter.ParameterReference;
import org.apache.nifi.parameter.ParameterTokenList; import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
@ -815,12 +818,110 @@ public abstract class AbstractComponentNode implements ComponentNode {
} }
private void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { protected void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) {
getComponent().onPropertyModified(descriptor, oldValue, newValue); getComponent().onPropertyModified(descriptor, oldValue, newValue);
} }
} }
@Override
public void onParametersModified(final Map<String, ParameterUpdate> updatedParameters) {
// If the component doesn't reference any parameters, then there's nothing to be done.
if (!isReferencingParameter()) {
return;
}
final ParameterLookup previousParameterLookup = createParameterLookupForPreviousValues(updatedParameters);
// For any Property that references an updated Parameter, we need to call onPropertyModified().
// Additionally, we need to trigger validation to run if this component is affected by the parameter update.
boolean componentAffected = false;
for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry : properties.entrySet()) {
final PropertyDescriptor propertyDescriptor = entry.getKey();
final PropertyConfiguration configuration = entry.getValue();
// Determine if this property is affected by the Parameter Update
boolean propertyAffected = false;
final List<ParameterReference> parameterReferences = configuration.getParameterReferences();
for (final ParameterReference reference : parameterReferences) {
final String referencedParamName = reference.getParameterName();
if (updatedParameters.containsKey(referencedParamName)) {
propertyAffected = true;
componentAffected = true;
break;
}
}
if (propertyAffected) {
final String previousValue = configuration.getEffectiveValue(previousParameterLookup);
final String updatedValue = configuration.getEffectiveValue(getParameterContext());
// Check if the value of the property is truly affected. It's possible that we could have a property configured as something like "#{a}#{b}"
// Where parameter a = "abc-" and b = "cba". The update could change a to "abc" and b to "-cba". As a result, the property value previously was "abc-cba" and still is.
// In such a case, we should not call onPropertyModified.
final boolean propertyUpdated = !Objects.equals(previousValue, updatedValue);
if (propertyUpdated) {
try {
logger.debug("Parameter Context updated, resulting in property {} of {} changing. Calling onPropertyModified().", propertyDescriptor, this);
onPropertyModified(propertyDescriptor, previousValue, updatedValue);
} catch (final Exception e) {
// nothing really to do here...
logger.error("Failed to notify {} that property {} changed", this, propertyDescriptor, e);
}
} else {
logger.debug("Parameter Context updated, and property {} of {} does reference the updated Parameters. However, the overall property value remained unchanged so will not call " +
"onPropertyModified().", propertyDescriptor, this);
}
}
}
// If this component is affected by the Parameter change, we need to re-validate
if (componentAffected) {
logger.debug("Configuration of {} changed due to an update to Parameter Context. Resetting validation state", this);
resetValidationState();
}
}
private ParameterLookup createParameterLookupForPreviousValues(final Map<String, ParameterUpdate> updatedParameters) {
final ParameterContext currentContext = getParameterContext();
return new ParameterLookup() {
@Override
public Optional<Parameter> getParameter(final String parameterName) {
final Optional<Parameter> optionalParameter = currentContext == null ? Optional.empty() : currentContext.getParameter(parameterName);
// Check if there's an update to the parameter. If not, just return the parameter as-is.
final ParameterUpdate parameterUpdate = updatedParameters.get(parameterName);
if (parameterUpdate == null) {
return optionalParameter;
}
// There is an update to the parameter. We want to return the previous value of the Parameter.
final ParameterDescriptor parameterDescriptor;
if (optionalParameter.isPresent()) {
parameterDescriptor = optionalParameter.get().getDescriptor();
} else {
parameterDescriptor = new ParameterDescriptor.Builder()
.name(parameterName)
.description("")
.sensitive(true)
.build();
}
final Parameter updatedParameter = new Parameter(parameterDescriptor, parameterUpdate.getPreviousValue());
return Optional.of(updatedParameter);
}
@Override
public boolean isEmpty() {
return (currentContext == null || currentContext.isEmpty()) && updatedParameters.isEmpty();
}
@Override
public long getVersion() {
return 0;
}
};
}
@Override @Override
public ValidationStatus getValidationStatus() { public ValidationStatus getValidationStatus() {

View File

@ -34,6 +34,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState; import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
import java.net.URL; import java.net.URL;
@ -71,6 +72,12 @@ public interface ComponentNode extends ComponentAuthorizable {
boolean isReferencingParameter(); boolean isReferencingParameter();
/**
* Notifies the Component that the value of a parameter has changed
* @param parameterUpdates a Map of Parameter name to a ParameterUpdate that describes how the Parameter changed
*/
void onParametersModified(Map<String, ParameterUpdate> parameterUpdates);
/** /**
* <p> * <p>
* Pause triggering asynchronous validation to occur when the component is updated. Often times, it is necessary * Pause triggering asynchronous validation to occur when the component is updated. Often times, it is necessary

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.nifi.controller; package org.apache.nifi.controller;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterReference; import org.apache.nifi.parameter.ParameterReference;
import org.apache.nifi.parameter.ParameterTokenList; import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.parameter.StandardParameterTokenList; import org.apache.nifi.parameter.StandardParameterTokenList;
@ -44,7 +44,7 @@ public class PropertyConfiguration {
return rawValue; return rawValue;
} }
public String getEffectiveValue(final ParameterContext parameterContext) { public String getEffectiveValue(final ParameterLookup parameterLookup) {
if (rawValue == null) { if (rawValue == null) {
return null; return null;
} }
@ -57,12 +57,12 @@ public class PropertyConfiguration {
// cache the Effective Value because we may have a different Parameter Context. So, we cache a Tuple of // cache the Effective Value because we may have a different Parameter Context. So, we cache a Tuple of
// the Parameter Context and the effective value for that Parameter Context. // the Parameter Context and the effective value for that Parameter Context.
final ComputedEffectiveValue computedEffectiveValue = effectiveValue.get(); final ComputedEffectiveValue computedEffectiveValue = effectiveValue.get();
if (computedEffectiveValue != null && computedEffectiveValue.matches(parameterContext)) { if (computedEffectiveValue != null && computedEffectiveValue.matches(parameterLookup)) {
return computedEffectiveValue.getValue(); return computedEffectiveValue.getValue();
} }
final String substituted = parameterTokenList.substitute(parameterContext); final String substituted = parameterTokenList.substitute(parameterLookup);
final ComputedEffectiveValue updatedValue = new ComputedEffectiveValue(parameterContext, substituted); final ComputedEffectiveValue updatedValue = new ComputedEffectiveValue(parameterLookup, substituted);
effectiveValue.compareAndSet(computedEffectiveValue, updatedValue); effectiveValue.compareAndSet(computedEffectiveValue, updatedValue);
return substituted; return substituted;
} }
@ -96,13 +96,13 @@ public class PropertyConfiguration {
public static class ComputedEffectiveValue { public static class ComputedEffectiveValue {
private final ParameterContext parameterContext; private final ParameterLookup parameterLookup;
private final long contextVersion; private final long contextVersion;
private final String value; private final String value;
public ComputedEffectiveValue(final ParameterContext parameterContext, final String value) { public ComputedEffectiveValue(final ParameterLookup parameterLookup, final String value) {
this.parameterContext = parameterContext; this.parameterLookup = parameterLookup;
this.contextVersion = parameterContext == null ? -1 : parameterContext.getVersion(); this.contextVersion = parameterLookup == null ? -1 : parameterLookup.getVersion();
this.value = value; this.value = value;
} }
@ -110,16 +110,16 @@ public class PropertyConfiguration {
return value; return value;
} }
public boolean matches(final ParameterContext context) { public boolean matches(final ParameterLookup parameterLookup) {
if (!Objects.equals(context, this.parameterContext)) { if (!Objects.equals(parameterLookup, this.parameterLookup)) {
return false; return false;
} }
if (context == null) { if (parameterLookup == null) {
return true; return true;
} }
return context.getVersion() == contextVersion; return parameterLookup.getVersion() == contextVersion;
} }
} }
} }

View File

@ -34,6 +34,7 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.FlowRegistryClient;
@ -1043,6 +1044,8 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
/** /**
* Called to notify the Process Group whenever the Parameter Context that it is bound to has changed. * Called to notify the Process Group whenever the Parameter Context that it is bound to has changed.
*
* @param updatedParameters a Map of parameter name to the ParameterUpdate that describes how the Parameter was updated
*/ */
void onParameterContextUpdated(); void onParameterContextUpdated(Map<String, ParameterUpdate> updatedParameters);
} }

View File

@ -91,12 +91,5 @@ public interface ParameterContext extends ParameterLookup, ComponentAuthorizable
*/ */
ParameterReferenceManager getParameterReferenceManager(); ParameterReferenceManager getParameterReferenceManager();
/**
* Indicates the current Version of the Parameter Context. Each time that the Parameter Context is updated, its version is incremented. This allows
* other components to know whether or not the values have changed since some other point in time. The version may or may not be persisted across
* restarts of the application.
*
* @return the current version
*/
long getVersion();
} }

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parameter;
public interface ParameterUpdate {
/**
* @return the name of the Parameter
*/
String getParameterName();
/**
* @return the previous value of the Parameter, or <code>null</code> if the Parameter is being added or previously had no value
*/
String getPreviousValue();
/**
* @return the updated value of the Parameter, or <code>null</code> if the Parameter was removed or previously had no value
*/
String getUpdatedValue();
/**
* @return <code>true</code> if the Parameter is sensitive, <code>false</code> otherwise.
*/
boolean isSensitive();
}

View File

@ -17,25 +17,33 @@
package org.apache.nifi.controller; package org.apache.nifi.controller;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -51,6 +59,52 @@ public class TestAbstractComponentNode {
assertEquals(ValidationStatus.VALIDATING, status); assertEquals(ValidationStatus.VALIDATING, status);
} }
public void testOnParametersModified() {
final AtomicLong validationCount = new AtomicLong(0L);
final ValidationTrigger validationTrigger = new ValidationTrigger() {
@Override
public void triggerAsync(ComponentNode component) {
validationCount.incrementAndGet();
}
@Override
public void trigger(ComponentNode component) {
validationCount.incrementAndGet();
}
};
final List<PropertyModification> propertyModifications = new ArrayList<>();
final ValidationControlledAbstractComponentNode node = new ValidationControlledAbstractComponentNode(0, validationTrigger) {
@Override
protected void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
propertyModifications.add(new PropertyModification(descriptor, oldValue, newValue));
super.onPropertyModified(descriptor, oldValue, newValue);
}
};
final Map<String, String> properties = new HashMap<>();
properties.put("abc", "#{abc}");
node.setProperties(properties);
final ParameterContext context = Mockito.mock(ParameterContext.class);
final ParameterDescriptor paramDescriptor = new ParameterDescriptor.Builder()
.name("abc")
.description("")
.sensitive(false)
.build();
final Parameter param = new Parameter(paramDescriptor, "123");
Mockito.doReturn(param).when(context).getParameter("abc");
final Map<String, ParameterUpdate> updatedParameters = new HashMap<>();
updatedParameters.put("abc", new MockParameterUpdate("abc", "xyz", "123", false));
node.onParametersModified(updatedParameters);
assertEquals(1, propertyModifications.size());
final PropertyModification mod = propertyModifications.get(0);
assertEquals("xyz", mod.getPreviousValue());
assertEquals("123", mod.getUpdatedValue());
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testValidationTriggerPaused() throws InterruptedException { public void testValidationTriggerPaused() throws InterruptedException {
final AtomicLong validationCount = new AtomicLong(0L); final AtomicLong validationCount = new AtomicLong(0L);
@ -84,6 +138,7 @@ public class TestAbstractComponentNode {
private static class ValidationControlledAbstractComponentNode extends AbstractComponentNode { private static class ValidationControlledAbstractComponentNode extends AbstractComponentNode {
private final long pauseMillis; private final long pauseMillis;
private volatile ParameterContext paramContext = null;
public ValidationControlledAbstractComponentNode(final long pauseMillis, final ValidationTrigger validationTrigger) { public ValidationControlledAbstractComponentNode(final long pauseMillis, final ValidationTrigger validationTrigger) {
super("id", Mockito.mock(ValidationContextFactory.class), Mockito.mock(ControllerServiceProvider.class), "unit test component", super("id", Mockito.mock(ValidationContextFactory.class), Mockito.mock(ControllerServiceProvider.class), "unit test component",
@ -168,7 +223,69 @@ public class TestAbstractComponentNode {
@Override @Override
protected ParameterContext getParameterContext() { protected ParameterContext getParameterContext() {
return null; return paramContext;
}
protected void setParameterContext(final ParameterContext parameterContext) {
this.paramContext = parameterContext;
}
}
private static class PropertyModification {
private final PropertyDescriptor propertyDescriptor;
private final String previousValue;
private final String updatedValue;
public PropertyModification(final PropertyDescriptor propertyDescriptor, final String previousValue, final String updatedValue) {
this.propertyDescriptor = propertyDescriptor;
this.previousValue = previousValue;
this.updatedValue = updatedValue;
}
public PropertyDescriptor getPropertyDescriptor() {
return propertyDescriptor;
}
public String getPreviousValue() {
return previousValue;
}
public String getUpdatedValue() {
return updatedValue;
}
}
private static class MockParameterUpdate implements ParameterUpdate {
private final String parameterName;
private final String oldValue;
private final String newValue;
private final boolean sensitive;
public MockParameterUpdate(final String parameterName, final String oldValue, final String newValue, final boolean sensitive) {
this.parameterName = parameterName;
this.oldValue = oldValue;
this.newValue = newValue;
this.sensitive = sensitive;
}
@Override
public String getParameterName() {
return parameterName;
}
@Override
public String getPreviousValue() {
return oldValue;
}
@Override
public String getUpdatedValue() {
return newValue;
}
@Override
public boolean isSensitive() {
return sensitive;
} }
} }
} }

View File

@ -75,6 +75,8 @@ import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReference; import org.apache.nifi.parameter.ParameterReference;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.parameter.StandardParameterUpdate;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
@ -156,6 +158,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -2911,32 +2914,91 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override @Override
public void setParameterContext(final ParameterContext parameterContext) { public void setParameterContext(final ParameterContext parameterContext) {
verifyCanSetParameterContext(parameterContext); verifyCanSetParameterContext(parameterContext);
// Determine which parameters have changed so that components can be appropriately updated.
final Map<String, ParameterUpdate> updatedParameters = mapParameterUpdates(this.parameterContext, parameterContext);
LOG.debug("Parameter Context for {} changed from {} to {}. This resulted in {} Parameter Updates ({}). Notifying Processors/Controller Services of the updates.",
this, this.parameterContext, parameterContext, updatedParameters.size(), updatedParameters);
this.parameterContext = parameterContext; this.parameterContext = parameterContext;
getProcessors().forEach(ProcessorNode::resetValidationState); if (!updatedParameters.isEmpty()) {
getControllerServices(false).forEach(ControllerServiceNode::resetValidationState); // Notify components that parameters have been updated
onParameterContextUpdated(updatedParameters);
}
} }
@Override @Override
public void onParameterContextUpdated() { public void onParameterContextUpdated(final Map<String, ParameterUpdate> updatedParameters) {
readLock.lock(); readLock.lock();
try { try {
for (final ProcessorNode processorNode : getProcessors()) { getProcessors().forEach(proc -> proc.onParametersModified(updatedParameters));
if (processorNode.isReferencingParameter() && processorNode.getScheduledState() != ScheduledState.RUNNING) { getControllerServices(false).forEach(cs -> cs.onParametersModified(updatedParameters));
processorNode.resetValidationState();
}
}
for (final ControllerServiceNode serviceNode : getControllerServices(false)) {
if (serviceNode.isReferencingParameter() && serviceNode.getState() == ControllerServiceState.DISABLING || serviceNode.getState() == ControllerServiceState.DISABLED) {
serviceNode.resetValidationState();
}
}
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
private Map<String, ParameterUpdate> mapParameterUpdates(final ParameterContext previousParameterContext, final ParameterContext updatedParameterContext) {
if (previousParameterContext == null && updatedParameterContext == null) {
return Collections.emptyMap();
}
if (updatedParameterContext == null) {
return createParameterUpdates(previousParameterContext, (descriptor, value) -> new StandardParameterUpdate(descriptor.getName(), value, null, descriptor.isSensitive()));
}
if (previousParameterContext == null) {
return createParameterUpdates(updatedParameterContext, (descriptor, value) -> new StandardParameterUpdate(descriptor.getName(), null, value, descriptor.isSensitive()));
}
// For each Parameter in the updated parameter context, add a ParameterUpdate to our map
final Map<String, ParameterUpdate> updatedParameters = new HashMap<>();
for (final Map.Entry<ParameterDescriptor, Parameter> entry : updatedParameterContext.getParameters().entrySet()) {
final ParameterDescriptor updatedDescriptor = entry.getKey();
final Parameter updatedParameter = entry.getValue();
final Optional<Parameter> previousParameterOption = previousParameterContext.getParameter(updatedDescriptor);
final String previousValue = previousParameterOption.map(Parameter::getValue).orElse(null);
final String updatedValue = updatedParameter.getValue();
if (!Objects.equals(previousValue, updatedValue)) {
final ParameterUpdate parameterUpdate = new StandardParameterUpdate(updatedDescriptor.getName(), previousValue, updatedValue, updatedDescriptor.isSensitive());
updatedParameters.put(updatedDescriptor.getName(), parameterUpdate);
}
}
// For each Parameter that was in the previous parameter context that is not in the updated Paramter Context, add a ParameterUpdate to our map with `null` for the updated value
for (final Map.Entry<ParameterDescriptor, Parameter> entry : previousParameterContext.getParameters().entrySet()) {
final ParameterDescriptor previousDescriptor = entry.getKey();
final Parameter previousParameter = entry.getValue();
final Optional<Parameter> updatedParameterOption = updatedParameterContext.getParameter(previousDescriptor);
if (updatedParameterOption.isPresent()) {
// The value exists in both Parameter Contexts. If it was changed, a Parameter Update has already been added to the map, above.
continue;
}
final ParameterUpdate parameterUpdate = new StandardParameterUpdate(previousDescriptor.getName(), previousParameter.getValue(), null, previousDescriptor.isSensitive());
updatedParameters.put(previousDescriptor.getName(), parameterUpdate);
}
return updatedParameters;
}
private Map<String, ParameterUpdate> createParameterUpdates(final ParameterContext parameterContext, final BiFunction<ParameterDescriptor, String, ParameterUpdate> parameterUpdateMapper) {
final Map<String, ParameterUpdate> updatedParameters = new HashMap<>();
for (final Map.Entry<ParameterDescriptor, Parameter> entry : parameterContext.getParameters().entrySet()) {
final ParameterDescriptor parameterDescriptor = entry.getKey();
final Parameter parameter = entry.getValue();
final ParameterUpdate parameterUpdate = parameterUpdateMapper.apply(parameterDescriptor, parameter.getValue());
updatedParameters.put(parameterDescriptor.getName(), parameterUpdate);
}
return updatedParameters;
}
@Override @Override
public void verifyCanSetParameterContext(final ParameterContext parameterContext) { public void verifyCanSetParameterContext(final ParameterContext parameterContext) {
readLock.lock(); readLock.lock();

View File

@ -30,6 +30,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -107,33 +108,44 @@ public class StandardParameterContext implements ParameterContext {
this.version++; this.version++;
verifyCanSetParameters(updatedParameters); verifyCanSetParameters(updatedParameters);
final Map<String, ParameterUpdate> parameterUpdates = new HashMap<>();
boolean changeAffectingComponents = false; boolean changeAffectingComponents = false;
for (final Map.Entry<String, Parameter> entry : updatedParameters.entrySet()) { for (final Map.Entry<String, Parameter> entry : updatedParameters.entrySet()) {
final String parameterName = entry.getKey(); final String parameterName = entry.getKey();
final Parameter parameter = entry.getValue(); final Parameter parameter = entry.getValue();
if (parameter == null) { if (parameter == null) {
final ParameterDescriptor parameterDescriptor = new ParameterDescriptor.Builder().name(parameterName).build();
parameters.remove(parameterDescriptor);
changeAffectingComponents = true; changeAffectingComponents = true;
final ParameterDescriptor parameterDescriptor = new ParameterDescriptor.Builder().name(parameterName).build();
final Parameter oldParameter = parameters.remove(parameterDescriptor);
parameterUpdates.put(parameterName, new StandardParameterUpdate(parameterName, oldParameter.getValue(), null, parameterDescriptor.isSensitive()));
} else { } else {
final Parameter updatedParameter = createFullyPopulatedParameter(parameter); final Parameter updatedParameter = createFullyPopulatedParameter(parameter);
final Parameter oldParameter = parameters.put(updatedParameter.getDescriptor(), updatedParameter); final Parameter oldParameter = parameters.put(updatedParameter.getDescriptor(), updatedParameter);
if (oldParameter == null || !Objects.equals(oldParameter.getValue(), updatedParameter.getValue())) { if (oldParameter == null || !Objects.equals(oldParameter.getValue(), updatedParameter.getValue())) {
changeAffectingComponents = true; changeAffectingComponents = true;
final String previousValue = oldParameter == null ? null : oldParameter.getValue();
parameterUpdates.put(parameterName, new StandardParameterUpdate(parameterName, previousValue, updatedParameter.getValue(), updatedParameter.getDescriptor().isSensitive()));
} }
} }
} }
if (changeAffectingComponents) { if (changeAffectingComponents) {
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). Notifying all affected components.", this, parameterUpdates.size(), parameterUpdates);
for (final ProcessGroup processGroup : parameterReferenceManager.getProcessGroupsBound(this)) { for (final ProcessGroup processGroup : parameterReferenceManager.getProcessGroupsBound(this)) {
try { try {
processGroup.onParameterContextUpdated(); processGroup.onParameterContextUpdated(parameterUpdates);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e); logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e);
} }
} }
} else {
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). No existing components are affected.", this, parameterUpdates.size(), parameterUpdates);
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -324,6 +336,11 @@ public class StandardParameterContext implements ParameterContext {
} }
} }
@Override
public String toString() {
return "StandardParameterContext[name=" + name + "]";
}
@Override @Override
public Authorizable getParentAuthorizable() { public Authorizable getParentAuthorizable() {
return new Authorizable() { return new Authorizable() {

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parameter;
import java.util.Objects;
public class StandardParameterUpdate implements ParameterUpdate {
private final String parameterName;
private final String previousValue;
private final String updatedValue;
private final boolean sensitiveParameter;
public StandardParameterUpdate(final String parameterName, final String previousValue, final String updatedValue, final boolean sensitiveParameter) {
this.parameterName = parameterName;
this.previousValue = previousValue;
this.updatedValue = updatedValue;
this.sensitiveParameter = sensitiveParameter;
}
@Override
public String getParameterName() {
return parameterName;
}
@Override
public String getPreviousValue() {
return previousValue;
}
@Override
public String getUpdatedValue() {
return updatedValue;
}
@Override
public boolean isSensitive() {
return sensitiveParameter;
}
@Override
public String toString() {
if (sensitiveParameter) {
return "StandardParameterUpdate[parameterName=" + parameterName + ", sensitive=true]";
} else {
return "StandardParameterUpdate[parameterName=" + parameterName + ", sensitive=false, previous value='" + previousValue + "', updated value='" + updatedValue + "']";
}
}
@Override
public boolean equals(final Object o) {
if (this == o){
return true;
}
if (!(o instanceof StandardParameterUpdate)) {
return false;
}
final StandardParameterUpdate that = (StandardParameterUpdate) o;
return Objects.equals(parameterName, that.parameterName)
&& Objects.equals(previousValue, that.previousValue)
&& Objects.equals(updatedValue, that.updatedValue);
}
@Override
public int hashCode() {
return Objects.hash(parameterName, previousValue, updatedValue);
}
}

View File

@ -36,6 +36,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation;
@ -710,7 +711,7 @@ public class MockProcessGroup implements ProcessGroup {
} }
@Override @Override
public void onParameterContextUpdated() { public void onParameterContextUpdated(final Map<String, ParameterUpdate> updatedParameters) {
} }
@Override @Override