mirror of https://github.com/apache/nifi.git
NIFI-241: Only call Processor methods with @OnAdded annotation when a Processor is actually added to graph instead of also calling on NiFi restart
This commit is contained in:
parent
f36eea3701
commit
21e809a7cb
|
@ -756,7 +756,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Creates a new ProcessorNode with the given type and identifier.</p>
|
* Creates a new ProcessorNode with the given type and identifier and initializes it invoking the
|
||||||
|
* methods annotated with {@link OnAdded}.
|
||||||
|
* </p>
|
||||||
*
|
*
|
||||||
* @param type
|
* @param type
|
||||||
* @param id
|
* @param id
|
||||||
|
@ -766,6 +768,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
* instantiated for any reason
|
* instantiated for any reason
|
||||||
*/
|
*/
|
||||||
public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
|
public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
|
||||||
|
return createProcessor(type, id, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Creates a new ProcessorNode with the given type and identifier and optionally initializes it.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param type the fully qualified Processor class name
|
||||||
|
* @param id the unique ID of the Processor
|
||||||
|
* @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true},
|
||||||
|
* will invoke methods annotated with the {@link OnAdded} annotation.
|
||||||
|
* @return
|
||||||
|
* @throws NullPointerException if either arg is null
|
||||||
|
* @throws ProcessorInstantiationException if the processor cannot be
|
||||||
|
* instantiated for any reason
|
||||||
|
*/
|
||||||
|
public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
|
||||||
id = id.intern();
|
id = id.intern();
|
||||||
final Processor processor = instantiateProcessor(type, id);
|
final Processor processor = instantiateProcessor(type, id);
|
||||||
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
|
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
|
||||||
|
@ -774,12 +794,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
|
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
|
||||||
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
|
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
|
||||||
|
|
||||||
// TODO: We should only call this the first time that it is added to the graph....
|
if ( firstTimeAdded ) {
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
|
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
|
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
|
||||||
throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
|
throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return procNode;
|
return procNode;
|
||||||
|
|
|
@ -594,7 +594,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||||
final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor");
|
final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor");
|
||||||
for (final Element processorElement : processorNodeList) {
|
for (final Element processorElement : processorNodeList) {
|
||||||
final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor);
|
final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor);
|
||||||
final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId());
|
final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), false);
|
||||||
processGroup.addProcessor(procNode);
|
processGroup.addProcessor(procNode);
|
||||||
updateProcessor(procNode, processorDTO, processGroup, controller);
|
updateProcessor(procNode, processorDTO, processGroup, controller);
|
||||||
}
|
}
|
||||||
|
|
|
@ -467,7 +467,7 @@ public final class FingerprintFactory {
|
||||||
Processor processor = null;
|
Processor processor = null;
|
||||||
try {
|
try {
|
||||||
if (controller != null) {
|
if (controller != null) {
|
||||||
processor = controller.createProcessor(className, UUID.randomUUID().toString()).getProcessor();
|
processor = controller.createProcessor(className, UUID.randomUUID().toString(), false).getProcessor();
|
||||||
}
|
}
|
||||||
} catch (ProcessorInstantiationException e) {
|
} catch (ProcessorInstantiationException e) {
|
||||||
logger.warn("Unable to create Processor of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", className, e.toString());
|
logger.warn("Unable to create Processor of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", className, e.toString());
|
||||||
|
|
Loading…
Reference in New Issue