mirror of https://github.com/apache/nifi.git
NIFI-3269 Added call to onPropertyModified when updating controller services in StandardProcessorTestRunner.
Added unit test to demonstrate change. This closes #1374. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
2c0f1c348e
commit
41189b055f
|
@ -754,9 +754,14 @@ public class StandardProcessorTestRunner implements TestRunner {
|
||||||
final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager, variableRegistry).getControllerServiceValidationContext(service);
|
final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager, variableRegistry).getControllerServiceValidationContext(service);
|
||||||
final ValidationResult validationResult = property.validate(value, validationContext);
|
final ValidationResult validationResult = property.validate(value, validationContext);
|
||||||
|
|
||||||
|
final String oldValue = updatedProps.get(property);
|
||||||
updatedProps.put(property, value);
|
updatedProps.put(property, value);
|
||||||
configuration.setProperties(updatedProps);
|
configuration.setProperties(updatedProps);
|
||||||
|
|
||||||
|
if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
|
||||||
|
service.onPropertyModified(property, oldValue, value);
|
||||||
|
}
|
||||||
|
|
||||||
return validationResult;
|
return validationResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,16 +17,24 @@
|
||||||
package org.apache.nifi.util;
|
package org.apache.nifi.util;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -34,6 +42,7 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestStandardProcessorTestRunner {
|
public class TestStandardProcessorTestRunner {
|
||||||
|
@ -70,7 +79,7 @@ public class TestStandardProcessorTestRunner {
|
||||||
runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 1);
|
||||||
|
|
||||||
runner.assertAllConditionsMet("success",
|
runner.assertAllConditionsMet("success",
|
||||||
mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1") && mff.isContentEqual("1,hello\n1,good-bye")
|
mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1") && mff.isContentEqual("1,hello\n1,good-bye")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,6 +176,37 @@ public class TestStandardProcessorTestRunner {
|
||||||
assertNull(runner.getVariableValue("hello"));
|
assertNull(runner.getVariableValue("hello"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testControllerServiceUpdateShouldCallOnSetProperty() {
|
||||||
|
// Arrange
|
||||||
|
final ControllerService testService = new SimpleTestService();
|
||||||
|
final AddAttributeProcessor proc = new AddAttributeProcessor();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
final String serviceIdentifier = "test";
|
||||||
|
final String pdName = "name";
|
||||||
|
final String pdValue = "exampleName";
|
||||||
|
try {
|
||||||
|
runner.addControllerService(serviceIdentifier, testService);
|
||||||
|
} catch (InitializationException e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse("onPropertyModified has been called", ((SimpleTestService) testService).isOpmCalled());
|
||||||
|
|
||||||
|
// Act
|
||||||
|
ValidationResult vr = runner.setProperty(testService, pdName, pdValue);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assertTrue(vr.isValid());
|
||||||
|
|
||||||
|
ControllerServiceConfiguration csConf = ((MockProcessContext) runner.getProcessContext()).getConfiguration(serviceIdentifier);
|
||||||
|
PropertyDescriptor propertyDescriptor = testService.getPropertyDescriptor(pdName);
|
||||||
|
String retrievedPDValue = csConf.getProperties().get(propertyDescriptor);
|
||||||
|
|
||||||
|
assertEquals(pdValue, retrievedPDValue);
|
||||||
|
assertTrue("onPropertyModified has not been called", ((SimpleTestService) testService).isOpmCalled());
|
||||||
|
}
|
||||||
|
|
||||||
private static class ProcessorWithOnStop extends AbstractProcessor {
|
private static class ProcessorWithOnStop extends AbstractProcessor {
|
||||||
|
|
||||||
private int callsWithContext = 0;
|
private int callsWithContext = 0;
|
||||||
|
@ -220,7 +260,7 @@ public class TestStandardProcessorTestRunner {
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
FlowFile ff = session.create();
|
FlowFile ff = session.create();
|
||||||
if(counter % 2 == 0) {
|
if (counter % 2 == 0) {
|
||||||
ff = session.putAttribute(ff, KEY, "value");
|
ff = session.putAttribute(ff, KEY, "value");
|
||||||
session.transfer(ff, REL_SUCCESS);
|
session.transfer(ff, REL_SUCCESS);
|
||||||
} else {
|
} else {
|
||||||
|
@ -233,14 +273,14 @@ public class TestStandardProcessorTestRunner {
|
||||||
private static class GoodProcessor extends AbstractProcessor {
|
private static class GoodProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("Successfully created FlowFile from ...")
|
.description("Successfully created FlowFile from ...")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
.name("failure")
|
.name("failure")
|
||||||
.description("... execution failed. Incoming FlowFile will be penalized and routed to this relationship")
|
.description("... execution failed. Incoming FlowFile will be penalized and routed to this relationship")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private final Set<Relationship> relationships;
|
private final Set<Relationship> relationships;
|
||||||
|
|
||||||
|
@ -259,9 +299,35 @@ public class TestStandardProcessorTestRunner {
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
|
|
||||||
for( FlowFile incoming : session.get(20)) {
|
for (FlowFile incoming : session.get(20)) {
|
||||||
session.transfer(incoming, REL_SUCCESS);
|
session.transfer(incoming, REL_SUCCESS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class SimpleTestService extends AbstractControllerService {
|
||||||
|
private final String PD_NAME = "name";
|
||||||
|
private PropertyDescriptor namePropertyDescriptor = new PropertyDescriptor.Builder()
|
||||||
|
.name(PD_NAME)
|
||||||
|
.displayName("Controller Service Name")
|
||||||
|
.required(false)
|
||||||
|
.sensitive(false)
|
||||||
|
.allowableValues("exampleName", "anotherExampleName")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private boolean opmCalled = false;
|
||||||
|
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return Arrays.asList(namePropertyDescriptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||||
|
getLogger().info("onPropertyModified called for PD {} with old value {} and new value {}", new Object[]{descriptor.getName(), oldValue, newValue});
|
||||||
|
opmCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isOpmCalled() {
|
||||||
|
return opmCalled;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue