diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java index eeebeae76c..99e06ed053 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java @@ -262,4 +262,9 @@ public interface Connectable extends Triggerable, Authorizable, Positionable { void verifyCanClearState() throws IllegalStateException; SchedulingStrategy getSchedulingStrategy(); + + /** + * @return the type of the component. I.e., the class name of the implementation + */ + String getComponentType(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index be4a198617..84610d112d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -46,17 +46,22 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final ControllerServiceProvider serviceProvider; private final AtomicReference name; private final AtomicReference annotationData = new AtomicReference<>(); + private final String componentType; + private final String componentCanonicalClass; private final Lock lock = new ReentrantLock(); private final ConcurrentMap properties = new ConcurrentHashMap<>(); public AbstractConfiguredComponent(final ConfigurableComponent component, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass) { this.id = id; this.component = component; this.validationContextFactory = validationContextFactory; this.serviceProvider = serviceProvider; this.name = new AtomicReference<>(component.getClass().getSimpleName()); + this.componentType = componentType; + this.componentCanonicalClass = componentCanonicalClass; } @Override @@ -308,4 +313,14 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone ControllerServiceProvider getControllerServiceProvider() { return this.serviceProvider; } + + @Override + public String getCanonicalClassName() { + return componentCanonicalClass; + } + + @Override + public String getComponentType() { + return componentType; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index 398e4fb375..f900bedb9e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -16,6 +16,22 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.nifi.authorization.Resource; @@ -36,22 +52,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.FormatUtils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; - public abstract class AbstractPort implements Port { public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java index d0534668e3..f1ee11e371 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java @@ -65,4 +65,14 @@ public interface ConfiguredComponent extends Authorizable { * @return the any validation errors for this connectable */ Collection getValidationErrors(); + + /** + * @return the type of the component. I.e., the class name of the implementation + */ + String getComponentType(); + + /** + * @return the Canonical Class Name of the component + */ + String getCanonicalClassName(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 9929914324..29c2cef3a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -42,8 +42,9 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen protected final AtomicReference scheduledState; public ProcessorNode(final Processor processor, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(processor, id, validationContextFactory, serviceProvider); + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass) { + super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass); this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index 289f52aff3..ca69316660 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -546,4 +546,9 @@ public class StandardFunnel implements Funnel { public SchedulingStrategy getSchedulingStrategy() { return SchedulingStrategy.TIMER_DRIVEN; } + + @Override + public String getComponentType() { + return "Funnel"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index 9432d6c428..fcf6b2d5e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -166,4 +166,9 @@ public class LocalPort extends AbstractPort { public boolean isSideEffectFree() { return true; } + + @Override + public String getComponentType() { + return "Local Port"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 162035f29e..f6388201c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -174,6 +174,7 @@ import org.apache.nifi.logging.ReportingTaskLogObserver; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarThreadContextClassLoader; +import org.apache.nifi.processor.GhostProcessor; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; @@ -197,6 +198,7 @@ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.GhostReportingTask; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; @@ -965,9 +967,30 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @SuppressWarnings("deprecation") public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { id = id.intern(); - final Processor processor = instantiateProcessor(type, id); + + boolean creationSuccessful; + Processor processor; + try { + processor = instantiateProcessor(type, id); + creationSuccessful = true; + } catch (final ProcessorInstantiationException pie) { + LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie); + final GhostProcessor ghostProc = new GhostProcessor(); + ghostProc.setIdentifier(id); + ghostProc.setCanonicalClassName(type); + processor = ghostProc; + creationSuccessful = false; + } + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); - final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); + final ProcessorNode procNode; + if (creationSuccessful) { + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); + } else { + final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; + final String componentType = "(Missing) " + simpleClassName; + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type); + } final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); @@ -2456,7 +2479,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setId(procNode.getIdentifier()); status.setGroupId(procNode.getProcessGroup().getIdentifier()); status.setName(procNode.getName()); - status.setType(procNode.getProcessor().getClass().getSimpleName()); + status.setType(procNode.getComponentType()); final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier()); if (entry == null) { @@ -2619,6 +2642,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } ReportingTask task = null; + boolean creationSuccessful = true; final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type); @@ -2633,8 +2657,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final Class reportingTaskClass = rawClass.asSubclass(ReportingTask.class); final Object reportingTaskObj = reportingTaskClass.newInstance(); task = reportingTaskClass.cast(reportingTaskObj); - } catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) { - throw new ReportingTaskInstantiationException(type, t); + } catch (final Exception e) { + LOG.error("Could not create Reporting Task of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e); + final GhostReportingTask ghostTask = new GhostReportingTask(); + ghostTask.setIdentifier(id); + ghostTask.setCanonicalClassName(type); + task = ghostTask; + creationSuccessful = false; } finally { if (ctxClassLoader != null) { Thread.currentThread().setContextClassLoader(ctxClassLoader); @@ -2642,7 +2671,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); - final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory); + final ReportingTaskNode taskNode; + if (creationSuccessful) { + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory); + } else { + final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; + final String componentType = "(Missing) " + simpleClassName; + + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type); + } + taskNode.setName(task.getClass().getSimpleName()); if (firstTimeAdded) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 2be0dcf65c..f2a7712153 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -111,10 +111,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final AtomicReference comments; private final AtomicReference position; private final AtomicReference annotationData; - private final AtomicReference schedulingPeriod; // stored as string - // so it's presented - // to user as they - // entered it + private final AtomicReference schedulingPeriod; // stored as string so it's presented to user as they entered it private final AtomicReference yieldPeriod; private final AtomicReference penalizationPeriod; private final AtomicReference> style; @@ -135,11 +132,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private SchedulingStrategy schedulingStrategy; // guarded by read/write lock // ??????? NOT any more + public StandardProcessorNode(final Processor processor, final String uuid, + final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, + final ControllerServiceProvider controllerServiceProvider) { + + this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, + processor.getClass().getSimpleName(), processor.getClass().getCanonicalName()); + } + @SuppressWarnings("deprecation") public StandardProcessorNode(final Processor processor, final String uuid, - final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, - final ControllerServiceProvider controllerServiceProvider) { - super(processor, uuid, validationContextFactory, controllerServiceProvider); + final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, + final ControllerServiceProvider controllerServiceProvider, + final String componentType, final String componentCanonicalClass) { + + super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); this.processor = processor; identifier = new AtomicReference<>(uuid); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 6aead13739..9117a16260 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -53,10 +53,21 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private volatile String comment; private volatile ScheduledState scheduledState = ScheduledState.STOPPED; + public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, + final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, + final ValidationContextFactory validationContextFactory) { + + this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, + reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName()); + } + + public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory) { - super(reportingTask, id, validationContextFactory, controllerServiceProvider); + final ValidationContextFactory validationContextFactory, + final String componentType, final String componentCanonicalClass) { + + super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); this.reportingTask = reportingTask; this.processScheduler = processScheduler; this.serviceLookup = controllerServiceProvider; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index f51a9def89..539ada14a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -37,6 +37,13 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme this.flowController = controller; } + public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, + final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, + final String componentType, final String canonicalClassName) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName); + this.flowController = controller; + } + @Override public Authorizable getParentAuthorizable() { return flowController; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java index 904b330619..4d0d850716 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java @@ -125,13 +125,7 @@ public class ProcessContext { void adjustCounter(final String name, final long delta) { final String localContext = connectable.getName() + " (" + connectable.getIdentifier() + ")"; - final String globalContext; - - if (connectable instanceof ProcessorNode) { - globalContext = "All " + ((ProcessorNode) connectable).getProcessor().getClass().getSimpleName() + "'s"; - } else { - globalContext = "All " + connectable.getClass().getSimpleName() + "'s"; - } + final String globalContext = "All " + connectable.getComponentType() + "'s"; counterRepo.adjustCounter(localContext, name, delta); counterRepo.adjustCounter(globalContext, name, delta); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 0071fedc71..426775e4af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -149,7 +149,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE switch (connectable.getConnectableType()) { case PROCESSOR: final ProcessorNode procNode = (ProcessorNode) connectable; - componentType = procNode.getProcessor().getClass().getSimpleName(); + componentType = procNode.getComponentType(); description = procNode.getProcessor().toString(); break; case INPUT_PORT: @@ -1394,12 +1394,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE eventBuilder.setComponentId(context.getConnectable().getIdentifier()); final Connectable connectable = context.getConnectable(); - final String processorType; - if (connectable instanceof ProcessorNode) { - processorType = ((ProcessorNode) connectable).getProcessor().getClass().getSimpleName(); - } else { - processorType = connectable.getClass().getSimpleName(); - } + final String processorType = connectable.getComponentType(); eventBuilder.setComponentType(processorType); eventBuilder.addParentFlowFile(parent); @@ -1684,15 +1679,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()}); final List expiredRecords = new ArrayList<>(flowFiles.size()); - final String processorType; final Connectable connectable = context.getConnectable(); - if (connectable instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) connectable; - processorType = procNode.getProcessor().getClass().getSimpleName(); - } else { - processorType = connectable.getClass().getSimpleName(); - } - + final String processorType = connectable.getComponentType(); final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), processorType, context.getProvenanceRepository(), this); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index 4d0cd9dac6..e26f5501bc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -319,7 +319,7 @@ public class StandardFlowSerializer implements FlowSerializer { addStyle(element, processor.getStyle()); addTextElement(element, "comment", processor.getComments()); - addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName()); + addTextElement(element, "class", processor.getCanonicalClassName()); addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks()); addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod()); addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod()); @@ -428,7 +428,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(serviceElement, "id", serviceNode.getIdentifier()); addTextElement(serviceElement, "name", serviceNode.getName()); addTextElement(serviceElement, "comment", serviceNode.getComments()); - addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName()); + addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName()); final ControllerServiceState state = serviceNode.getState(); final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); @@ -444,7 +444,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(taskElement, "id", taskNode.getIdentifier()); addTextElement(taskElement, "name", taskNode.getName()); addTextElement(taskElement, "comment", taskNode.getComments()); - addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName()); + addTextElement(taskElement, "class", taskNode.getCanonicalClassName()); addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod()); addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name()); addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index f936f326c8..6e29053743 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -73,7 +73,16 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(implementation, id, validationContextFactory, serviceProvider); + + this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider, + implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName()); + } + + public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass) { + + super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass); this.proxedControllerService = proxiedControllerService; this.implementation = implementation; this.serviceProvider = serviceProvider; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 05f5036245..425ee4012f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -32,9 +32,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ConfiguredComponent; @@ -129,11 +132,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi try { final ClassLoader cl = ExtensionManager.getClassLoader(type); final Class rawClass; - if (cl == null) { - rawClass = Class.forName(type); - } else { - Thread.currentThread().setContextClassLoader(cl); - rawClass = Class.forName(type, false, cl); + + try { + if (cl == null) { + rawClass = Class.forName(type); + } else { + Thread.currentThread().setContextClassLoader(cl); + rawClass = Class.forName(type, false, cl); + } + } catch (final Exception e) { + logger.error("Could not create Controller Service of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e); + Thread.currentThread().setContextClassLoader(currentContextClassLoader); + return createGhostControllerService(type, id); } final Class controllerServiceClass = rawClass.asSubclass(ControllerService.class); @@ -206,6 +216,57 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + private ControllerServiceNode createGhostControllerService(final String type, final String id) { + final InvocationHandler invocationHandler = new InvocationHandler() { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + final String methodName = method.getName(); + + if ("validate".equals(methodName)) { + final ValidationResult result = new ValidationResult.Builder() + .input("Any Property") + .subject("Missing Controller Service") + .valid(false) + .explanation("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found") + .build(); + return Collections.singleton(result); + } else if ("getPropertyDescriptor".equals(methodName)) { + final String propertyName = (String) args[0]; + return new PropertyDescriptor.Builder() + .name(propertyName) + .description(propertyName) + .sensitive(true) + .required(true) + .build(); + } else if ("getPropertyDescriptors".equals(methodName)) { + return Collections.emptyList(); + } else if ("onPropertyModified".equals(methodName)) { + return null; + } else if ("getIdentifier".equals(methodName)) { + return id; + } else if ("toString".equals(methodName)) { + return "GhostControllerService[id=" + id + ", type=" + type + "]"; + } else if ("hashCode".equals(methodName)) { + return 91 * type.hashCode() + 41 * id.hashCode(); + } else if ("equals".equals(methodName)) { + return proxy == args[0]; + } else { + throw new IllegalStateException("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found"); + } + } + }; + + final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), + new Class[] {ControllerService.class}, invocationHandler); + + final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; + final String componentType = "(Missing) " + simpleClassName; + + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id, + new StandardValidationContextFactory(this), this, componentType, type); + return serviceNode; + } + @Override public Set disableReferencingServices(final ControllerServiceNode serviceNode) { // Get a list of all Controller Services that need to be disabled, in the order that they need to be @@ -344,7 +405,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } final Set enabledNodes = Collections.synchronizedSet(new HashSet()); - final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size())); + final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("Enable Controller Services"); + return t; + } + }); + for (final List branch : branches) { final Runnable enableBranchRunnable = new Runnable() { @Override @@ -358,6 +428,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi logger.info("Enabling {}", serviceNode); try { + serviceNode.verifyCanEnable(); processScheduler.enableControllerService(serviceNode); } catch (final Exception e) { logger.error("Failed to enable " + serviceNode + " due to " + e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java new file mode 100644 index 0000000000..f6fc2da66d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processor; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.exception.ProcessException; + +public class GhostProcessor implements Processor { + private String id; + private String canonicalClassName; + + public void setIdentifier(final String id) { + this.id = id; + } + + public void setCanonicalClassName(final String canonicalClassName) { + this.canonicalClassName = canonicalClassName; + } + + @Override + public Collection validate(final ValidationContext context) { + return Collections.singleton(new ValidationResult.Builder() + .input("Any Property") + .subject("Missing Processor") + .valid(false) + .explanation("Processor is of type " + canonicalClassName + ", but this is not a valid Processor type") + .build()); + } + + @Override + public PropertyDescriptor getPropertyDescriptor(final String name) { + return buildDescriptor(name); + } + + private PropertyDescriptor buildDescriptor(final String propertyName) { + return new PropertyDescriptor.Builder() + .name(propertyName) + .description(propertyName) + .required(true) + .sensitive(true) + .build(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, String oldValue, String newValue) { + } + + @Override + public List getPropertyDescriptors() { + return Collections.emptyList(); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public void initialize(final ProcessorInitializationContext context) { + } + + @Override + public Set getRelationships() { + return Collections.emptySet(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + // Should never get this far. + throw new ProcessException("Unable to instantiate Processor class"); + } + + @Override + public String toString() { + return "GhostProcessor[id=" + id + ", class=" + canonicalClassName + "]"; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java new file mode 100644 index 0000000000..c0e55e7a28 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.exception.ProcessException; + +public class GhostReportingTask implements ReportingTask { + + private String id; + private String canonicalClassName; + + public void setIdentifier(final String id) { + this.id = id; + } + + public void setCanonicalClassName(final String canonicalClassName) { + this.canonicalClassName = canonicalClassName; + } + + @Override + public Collection validate(final ValidationContext context) { + return Collections.singleton(new ValidationResult.Builder() + .input("Any Property") + .subject("Missing Reporting Task") + .valid(false) + .explanation("Reporting Task is of type " + canonicalClassName + ", but this is not a valid Reporting Task type") + .build()); + } + + @Override + public PropertyDescriptor getPropertyDescriptor(final String name) { + return buildDescriptor(name); + } + + private PropertyDescriptor buildDescriptor(final String propertyName) { + return new PropertyDescriptor.Builder() + .name(propertyName) + .description(propertyName) + .required(true) + .sensitive(true) + .build(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, String oldValue, String newValue) { + } + + @Override + public List getPropertyDescriptors() { + return Collections.emptyList(); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String toString() { + return "GhostReportingTask[id=" + id + ", class=" + canonicalClassName + "]"; + } + + @Override + public void initialize(ReportingInitializationContext config) throws InitializationException { + } + + @Override + public void onTrigger(ReportingContext context) { + throw new ProcessException("Unable to instantiate ReportingTask class"); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java new file mode 100644 index 0000000000..09821e96e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.admin.service.KeyService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.MockProvenanceEventRepository; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.util.NiFiProperties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFlowController { + + private FlowController controller; + + @Before + public void setup() { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + + final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class); + final KeyService keyService = Mockito.mock(KeyService.class); + final AuditService auditService = Mockito.mock(AuditService.class); + final StringEncryptor encryptor = StringEncryptor.createEncryptor(); + final NiFiProperties properties = NiFiProperties.getInstance(); + properties.setProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceEventRepository.class.getName()); + properties.setProperty("nifi.remote.input.socket.port", ""); + properties.setProperty("nifi.remote.input.secure", ""); + + final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, keyService, auditService, encryptor, bulletinRepo); + } + + @After + public void cleanup() { + controller.shutdown(true); + } + + @Test + public void testCreateMissingProcessor() throws ProcessorInstantiationException { + final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor"); + assertNotNull(procNode); + assertEquals("org.apache.nifi.NonExistingProcessor", procNode.getCanonicalClassName()); + assertEquals("(Missing) NonExistingProcessor", procNode.getComponentType()); + + final PropertyDescriptor descriptor = procNode.getPropertyDescriptor("my descriptor"); + assertNotNull(descriptor); + assertEquals("my descriptor", descriptor.getName()); + assertTrue(descriptor.isRequired()); + assertTrue(descriptor.isSensitive()); + + final Relationship relationship = procNode.getRelationship("my relationship"); + assertEquals("my relationship", relationship.getName()); + } + + @Test + public void testCreateMissingReportingTask() throws ReportingTaskInstantiationException { + final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task", true); + assertNotNull(taskNode); + assertEquals("org.apache.nifi.NonExistingReportingTask", taskNode.getCanonicalClassName()); + assertEquals("(Missing) NonExistingReportingTask", taskNode.getComponentType()); + + final PropertyDescriptor descriptor = taskNode.getReportingTask().getPropertyDescriptor("my descriptor"); + assertNotNull(descriptor); + assertEquals("my descriptor", descriptor.getName()); + assertTrue(descriptor.isRequired()); + assertTrue(descriptor.isSensitive()); + } + + @Test + public void testCreateMissingControllerService() throws ProcessorInstantiationException { + final ControllerServiceNode serviceNode = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service", false); + assertNotNull(serviceNode); + assertEquals("org.apache.nifi.NonExistingControllerService", serviceNode.getCanonicalClassName()); + assertEquals("(Missing) NonExistingControllerService", serviceNode.getComponentType()); + + final ControllerService service = serviceNode.getControllerServiceImplementation(); + final PropertyDescriptor descriptor = service.getPropertyDescriptor("my descriptor"); + assertNotNull(descriptor); + assertEquals("my descriptor", descriptor.getName()); + assertTrue(descriptor.isRequired()); + assertTrue(descriptor.isSensitive()); + assertEquals("GhostControllerService[id=1234-Controller-Service, type=org.apache.nifi.NonExistingControllerService]", service.toString()); + service.hashCode(); // just make sure that an Exception is not thrown + assertTrue(service.equals(service)); + assertFalse(service.equals(serviceNode)); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 644018fd15..e418fa4f8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -174,6 +174,7 @@ public class TestStandardProcessSession { when(connectable.getProcessGroup()).thenReturn(procGroup); when(connectable.getIdentifier()).thenReturn("connectable-1"); when(connectable.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT); + when(connectable.getComponentType()).thenReturn("Unit Test Component"); Mockito.doAnswer(new Answer>() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties index 314304fa21..fc29cd714a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties @@ -30,6 +30,9 @@ nifi.ui.autorefresh.interval=30 sec nifi.nar.library.directory=./target/lib nifi.nar.working.directory=./target/work/nar/ +nifi.state.management.configuration.file=src/test/resources/state-management.xml +nifi.state.management.provider.local=local-provider + # H2 Settings nifi.database.directory=./database_repository nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index b44f562ebb..02a44b70b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -455,4 +455,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { public boolean isSideEffectFree() { return false; } + + @Override + public String getComponentType() { + return "RemoteGroupPort"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 2e5f175c11..7507935deb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -567,4 +567,9 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort public boolean isSideEffectFree() { return false; } + + @Override + public String getComponentType() { + return "RootGroupPort"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ComponentStateAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ComponentStateAuditor.java index 8e75cdcbea..e231456fcd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ComponentStateAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ComponentStateAuditor.java @@ -68,7 +68,7 @@ public class ComponentStateAuditor extends NiFiAuditor { // create the processor details FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails(); - processorDetails.setType(processor.getProcessor().getClass().getSimpleName()); + processorDetails.setType(processor.getComponentType()); // create the clear action FlowChangeAction configAction = new FlowChangeAction(); @@ -115,7 +115,7 @@ public class ComponentStateAuditor extends NiFiAuditor { // create the controller service details FlowChangeExtensionDetails controllerServiceDetails = new FlowChangeExtensionDetails(); - controllerServiceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName()); + controllerServiceDetails.setType(controllerService.getComponentType()); // create the clear action FlowChangeAction configAction = new FlowChangeAction(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java index 1fb5bb7c2d..ded90f868b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java @@ -123,7 +123,7 @@ public class ControllerServiceAuditor extends NiFiAuditor { // create the controller service details FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails(); - serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName()); + serviceDetails.setType(controllerService.getComponentType()); // create a controller service action Date actionTimestamp = new Date(); @@ -267,7 +267,7 @@ public class ControllerServiceAuditor extends NiFiAuditor { // create the processor details FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails(); - processorDetails.setType(processor.getProcessor().getClass().getSimpleName()); + processorDetails.setType(processor.getComponentType()); // create a processor action FlowChangeAction processorAction = new FlowChangeAction(); @@ -284,8 +284,8 @@ public class ControllerServiceAuditor extends NiFiAuditor { final ReportingTaskNode reportingTask = ((ReportingTaskNode) component); // create the reporting task details - FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails(); - processorDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName()); + FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails(); + taskDetails.setType(reportingTask.getComponentType()); // create a reporting task action FlowChangeAction reportingTaskAction = new FlowChangeAction(); @@ -295,7 +295,7 @@ public class ControllerServiceAuditor extends NiFiAuditor { reportingTaskAction.setSourceId(reportingTask.getIdentifier()); reportingTaskAction.setSourceName(reportingTask.getName()); reportingTaskAction.setSourceType(Component.ReportingTask); - reportingTaskAction.setComponentDetails(processorDetails); + reportingTaskAction.setComponentDetails(taskDetails); reportingTaskAction.setOperation(ScheduledState.RUNNING.equals(reportingTask.getScheduledState()) ? Operation.Start : Operation.Stop); actions.add(reportingTaskAction); } else if (component instanceof ControllerServiceNode) { @@ -303,7 +303,7 @@ public class ControllerServiceAuditor extends NiFiAuditor { // create the controller service details FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails(); - serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName()); + serviceDetails.setType(controllerService.getComponentType()); // create a controller service action FlowChangeAction serviceAction = new FlowChangeAction(); @@ -383,7 +383,7 @@ public class ControllerServiceAuditor extends NiFiAuditor { if (user != null) { // create the controller service details FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails(); - serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName()); + serviceDetails.setType(controllerService.getComponentType()); // create the controller service action for adding this controller service action = new FlowChangeAction(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java index 718def0205..8da70f05f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java @@ -132,7 +132,7 @@ public class ProcessorAuditor extends NiFiAuditor { // create the processor details FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails(); - processorDetails.setType(processor.getProcessor().getClass().getSimpleName()); + processorDetails.setType(processor.getComponentType()); // create a processor action Date actionTimestamp = new Date(); @@ -288,7 +288,7 @@ public class ProcessorAuditor extends NiFiAuditor { if (user != null) { // create the processor details FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails(); - processorDetails.setType(processor.getProcessor().getClass().getSimpleName()); + processorDetails.setType(processor.getComponentType()); // create the processor action for adding this processor action = new FlowChangeAction(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ReportingTaskAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ReportingTaskAuditor.java index 2e242d2f1d..0dc8ee308b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ReportingTaskAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ReportingTaskAuditor.java @@ -116,7 +116,7 @@ public class ReportingTaskAuditor extends NiFiAuditor { // create the reporting task details FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails(); - taskDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName()); + taskDetails.setType(reportingTask.getComponentType()); // create a reporting task action Date actionTimestamp = new Date(); @@ -272,7 +272,7 @@ public class ReportingTaskAuditor extends NiFiAuditor { if (user != null) { // create the reporting task details FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails(); - taskDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName()); + taskDetails.setType(reportingTask.getComponentType()); // create the reporting task action for adding this reporting task action = new FlowChangeAction(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java index 53a9474d77..66ddfbf1ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java @@ -181,7 +181,7 @@ public class ReportingTaskResource extends ApplicationResource { // authorize access serviceFacade.authorizeAccess(lookup -> { - final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + final Authorizable reportingTask = lookup.getReportingTask(id); reportingTask.authorize(authorizer, RequestAction.READ); }); @@ -245,7 +245,7 @@ public class ReportingTaskResource extends ApplicationResource { // authorize access serviceFacade.authorizeAccess(lookup -> { - final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + final Authorizable reportingTask = lookup.getReportingTask(id); reportingTask.authorize(authorizer, RequestAction.READ); }); @@ -300,7 +300,7 @@ public class ReportingTaskResource extends ApplicationResource { // authorize access serviceFacade.authorizeAccess(lookup -> { - final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + final Authorizable reportingTask = lookup.getReportingTask(id); reportingTask.authorize(authorizer, RequestAction.WRITE); }); @@ -359,7 +359,7 @@ public class ReportingTaskResource extends ApplicationResource { if (isValidationPhase(httpServletRequest)) { // authorize access serviceFacade.authorizeAccess(lookup -> { - final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + final Authorizable reportingTask = lookup.getReportingTask(id); reportingTask.authorize(authorizer, RequestAction.WRITE); }); serviceFacade.verifyCanClearReportingTaskState(id); @@ -442,7 +442,7 @@ public class ReportingTaskResource extends ApplicationResource { serviceFacade, revision, lookup -> { - final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + final Authorizable reportingTask = lookup.getReportingTask(id); reportingTask.authorize(authorizer, RequestAction.WRITE); }, () -> serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO), @@ -524,7 +524,7 @@ public class ReportingTaskResource extends ApplicationResource { serviceFacade, revision, lookup -> { - final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + final Authorizable reportingTask = lookup.getReportingTask(id); reportingTask.authorize(authorizer, RequestAction.WRITE); }, () -> serviceFacade.verifyDeleteReportingTask(id), diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index ceef1d4f86..8ae07f2a69 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1094,7 +1094,7 @@ public final class DtoFactory { final ReportingTaskDTO dto = new ReportingTaskDTO(); dto.setId(reportingTaskNode.getIdentifier()); dto.setName(reportingTaskNode.getName()); - dto.setType(reportingTaskNode.getReportingTask().getClass().getName()); + dto.setType(reportingTaskNode.getComponentType()); dto.setSchedulingStrategy(reportingTaskNode.getSchedulingStrategy().name()); dto.setSchedulingPeriod(reportingTaskNode.getSchedulingPeriod()); dto.setState(reportingTaskNode.getScheduledState().name()); @@ -1166,7 +1166,7 @@ public final class DtoFactory { dto.setId(controllerServiceNode.getIdentifier()); dto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier()); dto.setName(controllerServiceNode.getName()); - dto.setType(controllerServiceNode.getControllerServiceImplementation().getClass().getName()); + dto.setType(controllerServiceNode.getComponentType()); dto.setState(controllerServiceNode.getState().name()); dto.setAnnotationData(controllerServiceNode.getAnnotationData()); dto.setComments(controllerServiceNode.getComments()); @@ -1238,7 +1238,7 @@ public final class DtoFactory { dto.setGroupId(node.getProcessGroup().getIdentifier()); dto.setState(node.getScheduledState().name()); dto.setActiveThreadCount(node.getActiveThreadCount()); - dto.setType(node.getProcessor().getClass().getName()); + dto.setType(node.getComponentType()); dto.setReferenceType(Processor.class.getSimpleName()); propertyDescriptors = node.getProcessor().getPropertyDescriptors(); @@ -1247,7 +1247,7 @@ public final class DtoFactory { } else if (component instanceof ControllerServiceNode) { final ControllerServiceNode node = ((ControllerServiceNode) component); dto.setState(node.getState().name()); - dto.setType(node.getControllerServiceImplementation().getClass().getName()); + dto.setType(node.getComponentType()); dto.setReferenceType(ControllerService.class.getSimpleName()); propertyDescriptors = node.getControllerServiceImplementation().getPropertyDescriptors(); @@ -1257,7 +1257,7 @@ public final class DtoFactory { final ReportingTaskNode node = ((ReportingTaskNode) component); dto.setState(node.getScheduledState().name()); dto.setActiveThreadCount(node.getActiveThreadCount()); - dto.setType(node.getReportingTask().getClass().getName()); + dto.setType(node.getComponentType()); dto.setReferenceType(ReportingTask.class.getSimpleName()); propertyDescriptors = node.getReportingTask().getPropertyDescriptors(); @@ -1853,7 +1853,7 @@ public final class DtoFactory { dto.setInputRequirement(node.getInputRequirement().name()); dto.setPersistsState(node.getProcessor().getClass().isAnnotationPresent(Stateful.class)); - dto.setType(node.getProcessor().getClass().getCanonicalName()); + dto.setType(node.getCanonicalClassName()); dto.setName(node.getName()); dto.setState(node.getScheduledState().toString()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 973efa845e..dd55753938 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -1445,7 +1445,11 @@ public class ControllerFacade implements Authorizable { for (final Relationship relationship : procNode.getRelationships()) { addIfAppropriate(searchStr, relationship.getName(), "Relationship", matches); } + + // Add both the actual class name and the component type. This allows us to search for 'Ghost' + // to search for components that could not be instantiated. addIfAppropriate(searchStr, processor.getClass().getSimpleName(), "Type", matches); + addIfAppropriate(searchStr, procNode.getComponentType(), "Type", matches); for (final Map.Entry entry : procNode.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey();