NIFI-4: Added lifecycle annotation support

This commit is contained in:
Mark Payne 2015-01-16 15:52:47 -05:00
parent d734220d1e
commit d8e1f570a6
6 changed files with 238 additions and 37 deletions

View File

@ -16,8 +16,7 @@
*/
package org.apache.nifi.controller.service;
import java.util.Map;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.controller.ControllerServiceLookup;
/**
@ -26,15 +25,15 @@ import org.apache.nifi.controller.ControllerServiceLookup;
public interface ControllerServiceProvider extends ControllerServiceLookup {
/**
* Gets the controller service for the specified identifier. Returns null if
* the identifier does not match a known service.
* Creates a new Controller Service of the given type and assigns it the given id. If <code>firstTimeadded</code>
* is true, calls any methods that are annotated with {@link OnAdded}
*
* @param type
* @param id
* @param properties
* @param firstTimeAdded
* @return
*/
ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties);
ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded);
/**
* Gets the controller service node for the specified identifier. Returns
@ -44,4 +43,14 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
* @return
*/
ControllerServiceNode getControllerServiceNode(String id);
/**
* Removes the given Controller Service from the flow. This will call all appropriate methods
* that have the @OnRemoved annotation.
*
* @param serviceNode the controller service to remove
*
* @throws IllegalStateException if the controller service is not disabled or is not a part of this flow
*/
void removeControllerService(ControllerServiceNode serviceNode);
}

View File

@ -50,6 +50,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.DataFlow;
@ -134,6 +135,7 @@ import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ProcessorLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.Processor;
@ -2463,6 +2465,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
public ReportingTaskNode createReportingTask(final String type, String id) throws ReportingTaskInstantiationException {
return createReportingTask(type, id, true);
}
public ReportingTaskNode createReportingTask(final String type, String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null) {
throw new NullPointerException();
}
@ -2484,7 +2490,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
final Object reportingTaskObj = reportingTaskClass.newInstance();
task = reportingTaskClass.cast(reportingTaskObj);
} catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) {
throw new ReportingTaskInstantiationException(type, t);
} finally {
@ -2495,6 +2500,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
if ( firstTimeAdded ) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
} catch (final Exception e) {
throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
}
}
reportingTasks.put(id, taskNode);
return taskNode;
}
@ -2519,13 +2533,45 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler.unschedule(reportingTaskNode);
}
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
if ( existing == null || existing != reportingTaskNode ) {
throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
}
reportingTaskNode.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
reportingTasks.remove(reportingTaskNode.getIdentifier());
}
Collection<ReportingTaskNode> getReportingTasks() {
return reportingTasks.values();
}
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
processScheduler.enableReportingTask(reportingTaskNode);
}
public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
processScheduler.disableReportingTask(reportingTaskNode);
}
public void enableControllerService(final ControllerServiceNode serviceNode) {
processScheduler.enableControllerService(serviceNode);
}
public void disableControllerService(final ControllerServiceNode serviceNode) {
processScheduler.disableControllerService(serviceNode);
}
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
return controllerServiceProvider.createControllerService(type, id.intern(), properties);
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
return controllerServiceProvider.createControllerService(type, id.intern(), firstTimeAdded);
}
@Override
@ -2548,6 +2594,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
}
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.removeControllerService(serviceNode);
}
//
// Counters
//

View File

@ -27,6 +27,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -41,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
@ -513,14 +516,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
((AbstractPort) port).disable();
}
@Override
public synchronized void disableProcessor(final ProcessorNode procNode) {
if (procNode.getScheduledState() != ScheduledState.STOPPED) {
throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
}
procNode.setScheduledState(ScheduledState.DISABLED);
}
@Override
public synchronized void enablePort(final Port port) {
if (port.getScheduledState() != ScheduledState.DISABLED) {
@ -539,9 +534,84 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (procNode.getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
}
procNode.setScheduledState(ScheduledState.STOPPED);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
}
}
@Override
public synchronized void disableProcessor(final ProcessorNode procNode) {
if (procNode.getScheduledState() != ScheduledState.STOPPED) {
throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
}
procNode.setScheduledState(ScheduledState.DISABLED);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
}
}
public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
}
taskNode.setScheduledState(ScheduledState.STOPPED);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
}
}
public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
}
taskNode.setScheduledState(ScheduledState.DISABLED);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
}
}
public synchronized void enableControllerService(final ControllerServiceNode serviceNode) {
if ( !serviceNode.isDisabled() ) {
throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled");
}
// we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be
// done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service
// is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled
// before annotated methods.
serviceNode.setDisabled(false);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation());
}
}
public synchronized void disableControllerService(final ControllerServiceNode serviceNode) {
if ( serviceNode.isDisabled() ) {
throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled");
}
// We must set the service to disabled before we invoke the OnDisabled methods because the service node
// can throw Exceptions if we attempt to disable the service while it's known to be in use.
serviceNode.setDisabled(true);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
}
}
@Override
public boolean isScheduled(final Object scheduled) {
final ScheduleState scheduleState = scheduleStates.get(scheduled);
@ -549,7 +619,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
/**
* Returns the ScheduleState that is registered for the given ProcessorNode;
* Returns the ScheduleState that is registered for the given component;
* if no ScheduleState current is registered, one is created and registered
* atomically, and then that value is returned.
*

View File

@ -30,17 +30,20 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,7 +96,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
if (type == null || id == null) {
throw new NullPointerException();
}
@ -139,15 +142,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, id, validationContextFactory, this);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
serviceNodeHolder.set(serviceNode);
serviceNode.setAnnotationData(null);
serviceNode.setName(id);
for (final Map.Entry<String, String> entry : properties.entrySet()) {
serviceNode.setProperty(entry.getKey(), entry.getValue());
if ( firstTimeAdded ) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
} catch (final Exception e) {
throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
}
}
final StandardConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, originalService, configurationContext);
this.controllerServices.put(id, serviceNode);
return serviceNode;
@ -163,7 +169,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
return (node == null) ? null : node.getControllerService();
return (node == null) ? null : node.getProxiedControllerService();
}
@Override
@ -186,11 +192,28 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
final Set<String> identifiers = new HashSet<>();
for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass())) {
if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
identifiers.add(entry.getKey());
}
}
return identifiers;
}
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
if ( existing == null || existing != serviceNode ) {
throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
}
serviceNode.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
}
controllerServices.remove(serviceNode.getIdentifier());
}
}

View File

@ -46,11 +46,11 @@ public class StandardSchedulingContext implements SchedulingContext {
}
if (serviceNode.isDisabled()) {
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled");
}
if (!serviceNode.isValid()) {
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid");
}
serviceNode.addReference(processorNode);

View File

@ -22,6 +22,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -148,7 +149,28 @@ public class ReflectionUtils {
* is returned, an error will have been logged.
*/
public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) {
return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, args);
return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, null, args);
}
/**
* Invokes all methods on the given instance that have been annotated with
* the given Annotation. If the signature of the method that is defined in
* <code>instance</code> uses 1 or more parameters, those parameters must be
* specified by the <code>args</code> parameter. However, if more arguments
* are supplied by the <code>args</code> parameter than needed, the extra
* arguments will be ignored.
*
* @param annotation
* @param instance
* @param args
* @return <code>true</code> if all appropriate methods were invoked and
* returned without throwing an Exception, <code>false</code> if one of the
* methods threw an Exception or could not be invoked; if <code>false</code>
* is returned, an error will have been logged.
*/
public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final ProcessorLog logger, final Object... args) {
return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, logger, args);
}
@ -165,13 +187,15 @@ public class ReflectionUtils {
* @param preferredAnnotation
* @param alternateAnnotation
* @param instance
* @param logger the ProcessorLog to use for logging any errors. If null, will use own logger, but that will not generate bulletins
* or easily tie to the Processor's log messages.
* @param args
* @return <code>true</code> if all appropriate methods were invoked and
* returned without throwing an Exception, <code>false</code> if one of the
* methods threw an Exception or could not be invoked; if <code>false</code>
* is returned, an error will have been logged.
*/
public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final Object... args) {
public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final ProcessorLog logger, final Object... args) {
final List<Class<? extends Annotation>> annotationClasses = new ArrayList<>(alternateAnnotation == null ? 1 : 2);
annotationClasses.add(preferredAnnotation);
if ( alternateAnnotation != null ) {
@ -194,16 +218,28 @@ public class ReflectionUtils {
try {
final Class<?>[] argumentTypes = method.getParameterTypes();
if (argumentTypes.length > args.length) {
LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
if ( logger == null ) {
LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
} else {
logger.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
}
return false;
}
for (int i = 0; i < argumentTypes.length; i++) {
final Class<?> argType = argumentTypes[i];
if (!argType.isAssignableFrom(args[i].getClass())) {
LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
if ( logger == null ) {
LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
} else {
logger.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
}
return false;
}
}
@ -219,9 +255,21 @@ public class ReflectionUtils {
method.invoke(instance, argsToPass);
}
} catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
LOG.error("", t);
} catch (final InvocationTargetException ite) {
if ( logger == null ) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
LOG.error("", ite.getCause());
} else {
logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
}
} catch (final IllegalAccessException | IllegalArgumentException t) {
if ( logger == null ) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
LOG.error("", t);
} else {
logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
}
return false;
}
} finally {