NIFI-1052: Added Ghost Processors, Ghost Reporting Tasks, Ghost Controller Services

This closes #499.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-06-03 19:41:43 -04:00 committed by Bryan Bende
parent 18133988a0
commit 8a447eec66
30 changed files with 586 additions and 91 deletions

View File

@ -262,4 +262,9 @@ public interface Connectable extends Triggerable, Authorizable, Positionable {
void verifyCanClearState() throws IllegalStateException;
SchedulingStrategy getSchedulingStrategy();
/**
* @return the type of the component. I.e., the class name of the implementation
*/
String getComponentType();
}

View File

@ -46,17 +46,22 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
private final ControllerServiceProvider serviceProvider;
private final AtomicReference<String> name;
private final AtomicReference<String> annotationData = new AtomicReference<>();
private final String componentType;
private final String componentCanonicalClass;
private final Lock lock = new ReentrantLock();
private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>();
public AbstractConfiguredComponent(final ConfigurableComponent component, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass) {
this.id = id;
this.component = component;
this.validationContextFactory = validationContextFactory;
this.serviceProvider = serviceProvider;
this.name = new AtomicReference<>(component.getClass().getSimpleName());
this.componentType = componentType;
this.componentCanonicalClass = componentCanonicalClass;
}
@Override
@ -308,4 +313,14 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
ControllerServiceProvider getControllerServiceProvider() {
return this.serviceProvider;
}
@Override
public String getCanonicalClassName() {
return componentCanonicalClass;
}
@Override
public String getComponentType() {
return componentType;
}
}

View File

@ -16,6 +16,22 @@
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.Resource;
@ -36,22 +52,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
public abstract class AbstractPort implements Port {
public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()

View File

@ -65,4 +65,14 @@ public interface ConfiguredComponent extends Authorizable {
* @return the any validation errors for this connectable
*/
Collection<ValidationResult> getValidationErrors();
/**
* @return the type of the component. I.e., the class name of the implementation
*/
String getComponentType();
/**
* @return the Canonical Class Name of the component
*/
String getCanonicalClassName();
}

View File

@ -42,8 +42,9 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
protected final AtomicReference<ScheduledState> scheduledState;
public ProcessorNode(final Processor processor, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(processor, id, validationContextFactory, serviceProvider);
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass) {
super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass);
this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
}

View File

@ -546,4 +546,9 @@ public class StandardFunnel implements Funnel {
public SchedulingStrategy getSchedulingStrategy() {
return SchedulingStrategy.TIMER_DRIVEN;
}
@Override
public String getComponentType() {
return "Funnel";
}
}

View File

@ -166,4 +166,9 @@ public class LocalPort extends AbstractPort {
public boolean isSideEffectFree() {
return true;
}
@Override
public String getComponentType() {
return "Local Port";
}
}

View File

@ -174,6 +174,7 @@ import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.GhostProcessor;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
@ -197,6 +198,7 @@ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.GhostReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
@ -965,9 +967,30 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@SuppressWarnings("deprecation")
public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
id = id.intern();
final Processor processor = instantiateProcessor(type, id);
boolean creationSuccessful;
Processor processor;
try {
processor = instantiateProcessor(type, id);
creationSuccessful = true;
} catch (final ProcessorInstantiationException pie) {
LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie);
final GhostProcessor ghostProc = new GhostProcessor();
ghostProc.setIdentifier(id);
ghostProc.setCanonicalClassName(type);
processor = ghostProc;
creationSuccessful = false;
}
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider);
final ProcessorNode procNode;
if (creationSuccessful) {
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type);
}
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
@ -2456,7 +2479,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setId(procNode.getIdentifier());
status.setGroupId(procNode.getProcessGroup().getIdentifier());
status.setName(procNode.getName());
status.setType(procNode.getProcessor().getClass().getSimpleName());
status.setType(procNode.getComponentType());
final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
if (entry == null) {
@ -2619,6 +2642,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
ReportingTask task = null;
boolean creationSuccessful = true;
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type);
@ -2633,8 +2657,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
final Object reportingTaskObj = reportingTaskClass.newInstance();
task = reportingTaskClass.cast(reportingTaskObj);
} catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) {
throw new ReportingTaskInstantiationException(type, t);
} catch (final Exception e) {
LOG.error("Could not create Reporting Task of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
final GhostReportingTask ghostTask = new GhostReportingTask();
ghostTask.setIdentifier(id);
ghostTask.setCanonicalClassName(type);
task = ghostTask;
creationSuccessful = false;
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
@ -2642,7 +2671,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
final ReportingTaskNode taskNode;
if (creationSuccessful) {
taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type);
}
taskNode.setName(task.getClass().getSimpleName());
if (firstTimeAdded) {

View File

@ -111,10 +111,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final AtomicReference<String> comments;
private final AtomicReference<Position> position;
private final AtomicReference<String> annotationData;
private final AtomicReference<String> schedulingPeriod; // stored as string
// so it's presented
// to user as they
// entered it
private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it
private final AtomicReference<String> yieldPeriod;
private final AtomicReference<String> penalizationPeriod;
private final AtomicReference<Map<String, String>> style;
@ -135,11 +132,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
// ??????? NOT any more
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider) {
this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider,
processor.getClass().getSimpleName(), processor.getClass().getCanonicalName());
}
@SuppressWarnings("deprecation")
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider) {
super(processor, uuid, validationContextFactory, controllerServiceProvider);
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider,
final String componentType, final String componentCanonicalClass) {
super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
this.processor = processor;
identifier = new AtomicReference<>(uuid);

View File

@ -53,10 +53,21 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
private volatile String comment;
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory) {
this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory,
reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName());
}
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory) {
super(reportingTask, id, validationContextFactory, controllerServiceProvider);
final ValidationContextFactory validationContextFactory,
final String componentType, final String componentCanonicalClass) {
super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
this.reportingTask = reportingTask;
this.processScheduler = processScheduler;
this.serviceLookup = controllerServiceProvider;

View File

@ -37,6 +37,13 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
this.flowController = controller;
}
public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final String componentType, final String canonicalClassName) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName);
this.flowController = controller;
}
@Override
public Authorizable getParentAuthorizable() {
return flowController;

View File

@ -125,13 +125,7 @@ public class ProcessContext {
void adjustCounter(final String name, final long delta) {
final String localContext = connectable.getName() + " (" + connectable.getIdentifier() + ")";
final String globalContext;
if (connectable instanceof ProcessorNode) {
globalContext = "All " + ((ProcessorNode) connectable).getProcessor().getClass().getSimpleName() + "'s";
} else {
globalContext = "All " + connectable.getClass().getSimpleName() + "'s";
}
final String globalContext = "All " + connectable.getComponentType() + "'s";
counterRepo.adjustCounter(localContext, name, delta);
counterRepo.adjustCounter(globalContext, name, delta);

View File

@ -149,7 +149,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
switch (connectable.getConnectableType()) {
case PROCESSOR:
final ProcessorNode procNode = (ProcessorNode) connectable;
componentType = procNode.getProcessor().getClass().getSimpleName();
componentType = procNode.getComponentType();
description = procNode.getProcessor().toString();
break;
case INPUT_PORT:
@ -1394,12 +1394,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
eventBuilder.setComponentId(context.getConnectable().getIdentifier());
final Connectable connectable = context.getConnectable();
final String processorType;
if (connectable instanceof ProcessorNode) {
processorType = ((ProcessorNode) connectable).getProcessor().getClass().getSimpleName();
} else {
processorType = connectable.getClass().getSimpleName();
}
final String processorType = connectable.getComponentType();
eventBuilder.setComponentType(processorType);
eventBuilder.addParentFlowFile(parent);
@ -1684,15 +1679,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()});
final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
final String processorType;
final Connectable connectable = context.getConnectable();
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
processorType = procNode.getProcessor().getClass().getSimpleName();
} else {
processorType = connectable.getClass().getSimpleName();
}
final String processorType = connectable.getComponentType();
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
processorType, context.getProvenanceRepository(), this);

View File

@ -319,7 +319,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addStyle(element, processor.getStyle());
addTextElement(element, "comment", processor.getComments());
addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName());
addTextElement(element, "class", processor.getCanonicalClassName());
addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks());
addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod());
addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod());
@ -428,7 +428,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(serviceElement, "id", serviceNode.getIdentifier());
addTextElement(serviceElement, "name", serviceNode.getName());
addTextElement(serviceElement, "comment", serviceNode.getComments());
addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName());
final ControllerServiceState state = serviceNode.getState();
final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
@ -444,7 +444,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(taskElement, "id", taskNode.getIdentifier());
addTextElement(taskElement, "name", taskNode.getName());
addTextElement(taskElement, "comment", taskNode.getComments());
addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName());
addTextElement(taskElement, "class", taskNode.getCanonicalClassName());
addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());

View File

@ -73,7 +73,16 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(implementation, id, validationContextFactory, serviceProvider);
this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider,
implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName());
}
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass) {
super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;

View File

@ -32,9 +32,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.ConfiguredComponent;
@ -129,11 +132,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
try {
final ClassLoader cl = ExtensionManager.getClassLoader(type);
final Class<?> rawClass;
if (cl == null) {
rawClass = Class.forName(type);
} else {
Thread.currentThread().setContextClassLoader(cl);
rawClass = Class.forName(type, false, cl);
try {
if (cl == null) {
rawClass = Class.forName(type);
} else {
Thread.currentThread().setContextClassLoader(cl);
rawClass = Class.forName(type, false, cl);
}
} catch (final Exception e) {
logger.error("Could not create Controller Service of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
Thread.currentThread().setContextClassLoader(currentContextClassLoader);
return createGhostControllerService(type, id);
}
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
@ -206,6 +216,57 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
private ControllerServiceNode createGhostControllerService(final String type, final String id) {
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final String methodName = method.getName();
if ("validate".equals(methodName)) {
final ValidationResult result = new ValidationResult.Builder()
.input("Any Property")
.subject("Missing Controller Service")
.valid(false)
.explanation("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found")
.build();
return Collections.singleton(result);
} else if ("getPropertyDescriptor".equals(methodName)) {
final String propertyName = (String) args[0];
return new PropertyDescriptor.Builder()
.name(propertyName)
.description(propertyName)
.sensitive(true)
.required(true)
.build();
} else if ("getPropertyDescriptors".equals(methodName)) {
return Collections.emptyList();
} else if ("onPropertyModified".equals(methodName)) {
return null;
} else if ("getIdentifier".equals(methodName)) {
return id;
} else if ("toString".equals(methodName)) {
return "GhostControllerService[id=" + id + ", type=" + type + "]";
} else if ("hashCode".equals(methodName)) {
return 91 * type.hashCode() + 41 * id.hashCode();
} else if ("equals".equals(methodName)) {
return proxy == args[0];
} else {
throw new IllegalStateException("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found");
}
}
};
final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(),
new Class[] {ControllerService.class}, invocationHandler);
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id,
new StandardValidationContextFactory(this), this, componentType, type);
return serviceNode;
}
@Override
public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
@ -344,7 +405,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Enable Controller Services");
return t;
}
});
for (final List<ControllerServiceNode> branch : branches) {
final Runnable enableBranchRunnable = new Runnable() {
@Override
@ -358,6 +428,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
logger.info("Enabling {}", serviceNode);
try {
serviceNode.verifyCanEnable();
processScheduler.enableControllerService(serviceNode);
} catch (final Exception e) {
logger.error("Failed to enable " + serviceNode + " due to " + e);

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.exception.ProcessException;
public class GhostProcessor implements Processor {
private String id;
private String canonicalClassName;
public void setIdentifier(final String id) {
this.id = id;
}
public void setCanonicalClassName(final String canonicalClassName) {
this.canonicalClassName = canonicalClassName;
}
@Override
public Collection<ValidationResult> validate(final ValidationContext context) {
return Collections.singleton(new ValidationResult.Builder()
.input("Any Property")
.subject("Missing Processor")
.valid(false)
.explanation("Processor is of type " + canonicalClassName + ", but this is not a valid Processor type")
.build());
}
@Override
public PropertyDescriptor getPropertyDescriptor(final String name) {
return buildDescriptor(name);
}
private PropertyDescriptor buildDescriptor(final String propertyName) {
return new PropertyDescriptor.Builder()
.name(propertyName)
.description(propertyName)
.required(true)
.sensitive(true)
.build();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, String oldValue, String newValue) {
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
return Collections.emptyList();
}
@Override
public String getIdentifier() {
return id;
}
@Override
public void initialize(final ProcessorInitializationContext context) {
}
@Override
public Set<Relationship> getRelationships() {
return Collections.emptySet();
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
// Should never get this far.
throw new ProcessException("Unable to instantiate Processor class");
}
@Override
public String toString() {
return "GhostProcessor[id=" + id + ", class=" + canonicalClassName + "]";
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.exception.ProcessException;
public class GhostReportingTask implements ReportingTask {
private String id;
private String canonicalClassName;
public void setIdentifier(final String id) {
this.id = id;
}
public void setCanonicalClassName(final String canonicalClassName) {
this.canonicalClassName = canonicalClassName;
}
@Override
public Collection<ValidationResult> validate(final ValidationContext context) {
return Collections.singleton(new ValidationResult.Builder()
.input("Any Property")
.subject("Missing Reporting Task")
.valid(false)
.explanation("Reporting Task is of type " + canonicalClassName + ", but this is not a valid Reporting Task type")
.build());
}
@Override
public PropertyDescriptor getPropertyDescriptor(final String name) {
return buildDescriptor(name);
}
private PropertyDescriptor buildDescriptor(final String propertyName) {
return new PropertyDescriptor.Builder()
.name(propertyName)
.description(propertyName)
.required(true)
.sensitive(true)
.build();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, String oldValue, String newValue) {
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
return Collections.emptyList();
}
@Override
public String getIdentifier() {
return id;
}
@Override
public String toString() {
return "GhostReportingTask[id=" + id + ", class=" + canonicalClassName + "]";
}
@Override
public void initialize(ReportingInitializationContext config) throws InitializationException {
}
@Override
public void onTrigger(ReportingContext context) {
throw new ProcessException("Unable to instantiate ReportingTask class");
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFlowController {
private FlowController controller;
@Before
public void setup() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
final KeyService keyService = Mockito.mock(KeyService.class);
final AuditService auditService = Mockito.mock(AuditService.class);
final StringEncryptor encryptor = StringEncryptor.createEncryptor();
final NiFiProperties properties = NiFiProperties.getInstance();
properties.setProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceEventRepository.class.getName());
properties.setProperty("nifi.remote.input.socket.port", "");
properties.setProperty("nifi.remote.input.secure", "");
final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, keyService, auditService, encryptor, bulletinRepo);
}
@After
public void cleanup() {
controller.shutdown(true);
}
@Test
public void testCreateMissingProcessor() throws ProcessorInstantiationException {
final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor");
assertNotNull(procNode);
assertEquals("org.apache.nifi.NonExistingProcessor", procNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingProcessor", procNode.getComponentType());
final PropertyDescriptor descriptor = procNode.getPropertyDescriptor("my descriptor");
assertNotNull(descriptor);
assertEquals("my descriptor", descriptor.getName());
assertTrue(descriptor.isRequired());
assertTrue(descriptor.isSensitive());
final Relationship relationship = procNode.getRelationship("my relationship");
assertEquals("my relationship", relationship.getName());
}
@Test
public void testCreateMissingReportingTask() throws ReportingTaskInstantiationException {
final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task", true);
assertNotNull(taskNode);
assertEquals("org.apache.nifi.NonExistingReportingTask", taskNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingReportingTask", taskNode.getComponentType());
final PropertyDescriptor descriptor = taskNode.getReportingTask().getPropertyDescriptor("my descriptor");
assertNotNull(descriptor);
assertEquals("my descriptor", descriptor.getName());
assertTrue(descriptor.isRequired());
assertTrue(descriptor.isSensitive());
}
@Test
public void testCreateMissingControllerService() throws ProcessorInstantiationException {
final ControllerServiceNode serviceNode = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service", false);
assertNotNull(serviceNode);
assertEquals("org.apache.nifi.NonExistingControllerService", serviceNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingControllerService", serviceNode.getComponentType());
final ControllerService service = serviceNode.getControllerServiceImplementation();
final PropertyDescriptor descriptor = service.getPropertyDescriptor("my descriptor");
assertNotNull(descriptor);
assertEquals("my descriptor", descriptor.getName());
assertTrue(descriptor.isRequired());
assertTrue(descriptor.isSensitive());
assertEquals("GhostControllerService[id=1234-Controller-Service, type=org.apache.nifi.NonExistingControllerService]", service.toString());
service.hashCode(); // just make sure that an Exception is not thrown
assertTrue(service.equals(service));
assertFalse(service.equals(serviceNode));
}
}

View File

@ -174,6 +174,7 @@ public class TestStandardProcessSession {
when(connectable.getProcessGroup()).thenReturn(procGroup);
when(connectable.getIdentifier()).thenReturn("connectable-1");
when(connectable.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT);
when(connectable.getComponentType()).thenReturn("Unit Test Component");
Mockito.doAnswer(new Answer<Set<Connection>>() {
@Override

View File

@ -30,6 +30,9 @@ nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
nifi.state.management.configuration.file=src/test/resources/state-management.xml
nifi.state.management.provider.local=local-provider
# H2 Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE

View File

@ -455,4 +455,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public boolean isSideEffectFree() {
return false;
}
@Override
public String getComponentType() {
return "RemoteGroupPort";
}
}

View File

@ -567,4 +567,9 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
public boolean isSideEffectFree() {
return false;
}
@Override
public String getComponentType() {
return "RootGroupPort";
}
}

View File

@ -68,7 +68,7 @@ public class ComponentStateAuditor extends NiFiAuditor {
// create the processor details
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
processorDetails.setType(processor.getComponentType());
// create the clear action
FlowChangeAction configAction = new FlowChangeAction();
@ -115,7 +115,7 @@ public class ComponentStateAuditor extends NiFiAuditor {
// create the controller service details
FlowChangeExtensionDetails controllerServiceDetails = new FlowChangeExtensionDetails();
controllerServiceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
controllerServiceDetails.setType(controllerService.getComponentType());
// create the clear action
FlowChangeAction configAction = new FlowChangeAction();

View File

@ -123,7 +123,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
// create the controller service details
FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails();
serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
serviceDetails.setType(controllerService.getComponentType());
// create a controller service action
Date actionTimestamp = new Date();
@ -267,7 +267,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
// create the processor details
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
processorDetails.setType(processor.getComponentType());
// create a processor action
FlowChangeAction processorAction = new FlowChangeAction();
@ -284,8 +284,8 @@ public class ControllerServiceAuditor extends NiFiAuditor {
final ReportingTaskNode reportingTask = ((ReportingTaskNode) component);
// create the reporting task details
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails();
taskDetails.setType(reportingTask.getComponentType());
// create a reporting task action
FlowChangeAction reportingTaskAction = new FlowChangeAction();
@ -295,7 +295,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
reportingTaskAction.setSourceId(reportingTask.getIdentifier());
reportingTaskAction.setSourceName(reportingTask.getName());
reportingTaskAction.setSourceType(Component.ReportingTask);
reportingTaskAction.setComponentDetails(processorDetails);
reportingTaskAction.setComponentDetails(taskDetails);
reportingTaskAction.setOperation(ScheduledState.RUNNING.equals(reportingTask.getScheduledState()) ? Operation.Start : Operation.Stop);
actions.add(reportingTaskAction);
} else if (component instanceof ControllerServiceNode) {
@ -303,7 +303,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
// create the controller service details
FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails();
serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
serviceDetails.setType(controllerService.getComponentType());
// create a controller service action
FlowChangeAction serviceAction = new FlowChangeAction();
@ -383,7 +383,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
if (user != null) {
// create the controller service details
FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails();
serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
serviceDetails.setType(controllerService.getComponentType());
// create the controller service action for adding this controller service
action = new FlowChangeAction();

View File

@ -132,7 +132,7 @@ public class ProcessorAuditor extends NiFiAuditor {
// create the processor details
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
processorDetails.setType(processor.getComponentType());
// create a processor action
Date actionTimestamp = new Date();
@ -288,7 +288,7 @@ public class ProcessorAuditor extends NiFiAuditor {
if (user != null) {
// create the processor details
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
processorDetails.setType(processor.getComponentType());
// create the processor action for adding this processor
action = new FlowChangeAction();

View File

@ -116,7 +116,7 @@ public class ReportingTaskAuditor extends NiFiAuditor {
// create the reporting task details
FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails();
taskDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
taskDetails.setType(reportingTask.getComponentType());
// create a reporting task action
Date actionTimestamp = new Date();
@ -272,7 +272,7 @@ public class ReportingTaskAuditor extends NiFiAuditor {
if (user != null) {
// create the reporting task details
FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails();
taskDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
taskDetails.setType(reportingTask.getComponentType());
// create the reporting task action for adding this reporting task
action = new FlowChangeAction();

View File

@ -181,7 +181,7 @@ public class ReportingTaskResource extends ApplicationResource {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable reportingTask = lookup.getRemoteProcessGroup(id);
final Authorizable reportingTask = lookup.getReportingTask(id);
reportingTask.authorize(authorizer, RequestAction.READ);
});
@ -245,7 +245,7 @@ public class ReportingTaskResource extends ApplicationResource {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable reportingTask = lookup.getRemoteProcessGroup(id);
final Authorizable reportingTask = lookup.getReportingTask(id);
reportingTask.authorize(authorizer, RequestAction.READ);
});
@ -300,7 +300,7 @@ public class ReportingTaskResource extends ApplicationResource {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable reportingTask = lookup.getRemoteProcessGroup(id);
final Authorizable reportingTask = lookup.getReportingTask(id);
reportingTask.authorize(authorizer, RequestAction.WRITE);
});
@ -359,7 +359,7 @@ public class ReportingTaskResource extends ApplicationResource {
if (isValidationPhase(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable reportingTask = lookup.getRemoteProcessGroup(id);
final Authorizable reportingTask = lookup.getReportingTask(id);
reportingTask.authorize(authorizer, RequestAction.WRITE);
});
serviceFacade.verifyCanClearReportingTaskState(id);
@ -442,7 +442,7 @@ public class ReportingTaskResource extends ApplicationResource {
serviceFacade,
revision,
lookup -> {
final Authorizable reportingTask = lookup.getRemoteProcessGroup(id);
final Authorizable reportingTask = lookup.getReportingTask(id);
reportingTask.authorize(authorizer, RequestAction.WRITE);
},
() -> serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO),
@ -524,7 +524,7 @@ public class ReportingTaskResource extends ApplicationResource {
serviceFacade,
revision,
lookup -> {
final Authorizable reportingTask = lookup.getRemoteProcessGroup(id);
final Authorizable reportingTask = lookup.getReportingTask(id);
reportingTask.authorize(authorizer, RequestAction.WRITE);
},
() -> serviceFacade.verifyDeleteReportingTask(id),

View File

@ -1094,7 +1094,7 @@ public final class DtoFactory {
final ReportingTaskDTO dto = new ReportingTaskDTO();
dto.setId(reportingTaskNode.getIdentifier());
dto.setName(reportingTaskNode.getName());
dto.setType(reportingTaskNode.getReportingTask().getClass().getName());
dto.setType(reportingTaskNode.getComponentType());
dto.setSchedulingStrategy(reportingTaskNode.getSchedulingStrategy().name());
dto.setSchedulingPeriod(reportingTaskNode.getSchedulingPeriod());
dto.setState(reportingTaskNode.getScheduledState().name());
@ -1166,7 +1166,7 @@ public final class DtoFactory {
dto.setId(controllerServiceNode.getIdentifier());
dto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier());
dto.setName(controllerServiceNode.getName());
dto.setType(controllerServiceNode.getControllerServiceImplementation().getClass().getName());
dto.setType(controllerServiceNode.getComponentType());
dto.setState(controllerServiceNode.getState().name());
dto.setAnnotationData(controllerServiceNode.getAnnotationData());
dto.setComments(controllerServiceNode.getComments());
@ -1238,7 +1238,7 @@ public final class DtoFactory {
dto.setGroupId(node.getProcessGroup().getIdentifier());
dto.setState(node.getScheduledState().name());
dto.setActiveThreadCount(node.getActiveThreadCount());
dto.setType(node.getProcessor().getClass().getName());
dto.setType(node.getComponentType());
dto.setReferenceType(Processor.class.getSimpleName());
propertyDescriptors = node.getProcessor().getPropertyDescriptors();
@ -1247,7 +1247,7 @@ public final class DtoFactory {
} else if (component instanceof ControllerServiceNode) {
final ControllerServiceNode node = ((ControllerServiceNode) component);
dto.setState(node.getState().name());
dto.setType(node.getControllerServiceImplementation().getClass().getName());
dto.setType(node.getComponentType());
dto.setReferenceType(ControllerService.class.getSimpleName());
propertyDescriptors = node.getControllerServiceImplementation().getPropertyDescriptors();
@ -1257,7 +1257,7 @@ public final class DtoFactory {
final ReportingTaskNode node = ((ReportingTaskNode) component);
dto.setState(node.getScheduledState().name());
dto.setActiveThreadCount(node.getActiveThreadCount());
dto.setType(node.getReportingTask().getClass().getName());
dto.setType(node.getComponentType());
dto.setReferenceType(ReportingTask.class.getSimpleName());
propertyDescriptors = node.getReportingTask().getPropertyDescriptors();
@ -1853,7 +1853,7 @@ public final class DtoFactory {
dto.setInputRequirement(node.getInputRequirement().name());
dto.setPersistsState(node.getProcessor().getClass().isAnnotationPresent(Stateful.class));
dto.setType(node.getProcessor().getClass().getCanonicalName());
dto.setType(node.getCanonicalClassName());
dto.setName(node.getName());
dto.setState(node.getScheduledState().toString());

View File

@ -1445,7 +1445,11 @@ public class ControllerFacade implements Authorizable {
for (final Relationship relationship : procNode.getRelationships()) {
addIfAppropriate(searchStr, relationship.getName(), "Relationship", matches);
}
// Add both the actual class name and the component type. This allows us to search for 'Ghost'
// to search for components that could not be instantiated.
addIfAppropriate(searchStr, processor.getClass().getSimpleName(), "Type", matches);
addIfAppropriate(searchStr, procNode.getComponentType(), "Type", matches);
for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();