NIFI-4151: Updated UpdateAttribute to only create JAXB Context once; Minor performance tweaks to standard validators and StatusMerge.prettyPrint; updated AbstractConfiguredComponent to not create a new ValidationContext each time that validate is called but only when needed; updated FlowController, StandardControllerServiceProvider, and StandardProcessGroup so that component lookups can be performed using a ConcurrentMap at FlowController level instead of having to perform a depth-first search through all ProcessGroups when calling findProcessor(), findProcessGroup(), findXYZ()

This closes #1979
This commit is contained in:
Mark Payne 2017-07-05 11:24:01 -04:00 committed by Matt Gilman
parent cff81c0cd2
commit ba56774fa1
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
12 changed files with 353 additions and 137 deletions

View File

@ -391,6 +391,8 @@ public class StandardValidators {
}
public static final Validator TIME_PERIOD_VALIDATOR = new Validator() {
private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
@ -400,7 +402,7 @@ public class StandardValidators {
if (input == null) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
}
if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) {
if (TIME_DURATION_PATTERN.matcher(input.toLowerCase()).matches()) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
@ -416,6 +418,8 @@ public class StandardValidators {
};
public static final Validator DATA_SIZE_VALIDATOR = new Validator() {
private final Pattern DATA_SIZE_PATTERN = Pattern.compile(DataUnit.DATA_SIZE_REGEX);
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
@ -430,7 +434,7 @@ public class StandardValidators {
.explanation("Data Size cannot be null")
.build();
}
if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) {
if (DATA_SIZE_PATTERN.matcher(input.toUpperCase()).matches()) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
@ -706,8 +710,7 @@ public class StandardValidators {
//
//
static class TimePeriodValidator implements Validator {
private final Pattern pattern;
private static final Pattern pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
private final long minNanos;
private final long maxNanos;
@ -716,8 +719,6 @@ public class StandardValidators {
private final String maxValueEnglish;
public TimePeriodValidator(final long minValue, final TimeUnit minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) {
pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, minTimeUnit);
this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, maxTimeUnit);
this.minValueEnglish = minValue + " " + minTimeUnit.toString();

View File

@ -61,6 +61,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
public class StatusMerger {
private static final String ZERO_COUNT = "0";
private static final String ZERO_BYTES = "0 bytes";
private static final String ZERO_COUNT_AND_BYTES = "0 (0 bytes)";
private static final String EMPTY_COUNT = "-";
private static final String EMPTY_BYTES = "-";
public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) {
if (target == null || toMerge == null) {
return;
@ -743,14 +749,32 @@ public class StatusMerger {
}
public static String formatCount(final Integer intStatus) {
return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
if (intStatus == null) {
return EMPTY_COUNT;
}
if (intStatus == 0) {
return ZERO_COUNT;
}
return FormatUtils.formatCount(intStatus);
}
public static String formatDataSize(final Long longStatus) {
return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus);
if (longStatus == null) {
return EMPTY_BYTES;
}
if (longStatus == 0L) {
return ZERO_BYTES;
}
return FormatUtils.formatDataSize(longStatus);
}
public static String prettyPrint(final Integer count, final Long bytes) {
if (count != null && bytes != null && count == 0 && bytes == 0L) {
return ZERO_COUNT_AND_BYTES;
}
return formatCount(count) + " (" + formatDataSize(bytes) + ")";
}

View File

@ -61,6 +61,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
private final ControllerServiceProvider serviceProvider;
private final AtomicReference<String> name;
private final AtomicReference<String> annotationData = new AtomicReference<>();
private final AtomicReference<ValidationContext> validationContext = new AtomicReference<>();
private final String componentType;
private final String componentCanonicalClass;
private final VariableRegistry variableRegistry;
@ -118,6 +119,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public void setAnnotationData(final String data) {
invalidateValidationContext();
annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data));
}
@ -435,6 +437,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
invalidateValidationContext();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) {
getComponent().onPropertyModified(descriptor, oldValue, newValue);
}
@ -449,8 +452,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public boolean isValid() {
final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(
getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()));
final Collection<ValidationResult> validationResults = validate(getValidationContext());
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
@ -470,8 +472,13 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
final List<ValidationResult> results = new ArrayList<>();
lock.lock();
try {
final ValidationContext validationContext = validationContextFactory.newValidationContext(
serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final ValidationContext validationContext;
if (serviceIdentifiersNotToValidate == null || serviceIdentifiersNotToValidate.isEmpty()) {
validationContext = getValidationContext();
} else {
validationContext = getValidationContextFactory().newValidationContext(serviceIdentifiersNotToValidate,
getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
}
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) {
@ -515,6 +522,25 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return this.validationContextFactory;
}
protected void invalidateValidationContext() {
this.validationContext.set(null);
}
protected ValidationContext getValidationContext() {
while (true) {
ValidationContext context = this.validationContext.get();
if (context != null) {
return context;
}
context = getValidationContextFactory().newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final boolean updated = validationContext.compareAndSet(null, context);
if (updated) {
return context;
}
}
}
protected VariableRegistry getVariableRegistry() {
return this.variableRegistry;
}

View File

@ -18,7 +18,6 @@ package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import com.sun.jersey.api.client.ClientHandlerException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -47,7 +46,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -248,6 +249,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.ClientHandlerException;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
@ -301,6 +304,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final VariableRegistry variableRegistry;
private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Port> allInputPorts = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Port> allOutputPorts = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Funnel> allFunnels = new ConcurrentHashMap<>();
private volatile ZooKeeperStateServer zooKeeperStateServer;
// The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
@ -532,7 +542,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler,
nifiProperties, encryptor, this, this.variableRegistry);
rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
rootGroupRef.set(rootGroup);
setRootGroup(rootGroup);
instanceId = ComponentIdGenerator.generateId().toString();
controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry, this.nifiProperties);
@ -1660,6 +1670,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
allProcessGroups.put(group.getIdentifier(), group);
allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group);
} finally {
writeLock.unlock();
}
@ -2364,12 +2376,76 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @return the process group or null if not group is found
*/
public ProcessGroup getGroup(final String id) {
requireNonNull(id);
final ProcessGroup root = getRootGroup();
final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
return root == null ? null : root.findProcessGroup(searchId);
return allProcessGroups.get(requireNonNull(id));
}
public void onProcessGroupAdded(final ProcessGroup group) {
allProcessGroups.put(group.getIdentifier(), group);
}
public void onProcessGroupRemoved(final ProcessGroup group) {
allProcessGroups.remove(group.getIdentifier());
}
public void onProcessorAdded(final ProcessorNode procNode) {
allProcessors.put(procNode.getIdentifier(), procNode);
}
public void onProcessorRemoved(final ProcessorNode procNode) {
allProcessors.remove(procNode.getIdentifier());
}
public ProcessorNode getProcessorNode(final String id) {
return allProcessors.get(id);
}
public void onConnectionAdded(final Connection connection) {
allConnections.put(connection.getIdentifier(), connection);
}
public void onConnectionRemoved(final Connection connection) {
allConnections.remove(connection.getIdentifier());
}
public Connection getConnection(final String id) {
return allConnections.get(id);
}
public void onInputPortAdded(final Port inputPort) {
allInputPorts.put(inputPort.getIdentifier(), inputPort);
}
public void onInputPortRemoved(final Port inputPort) {
allInputPorts.remove(inputPort.getIdentifier());
}
public Port getInputPort(final String id) {
return allInputPorts.get(id);
}
public void onOutputPortAdded(final Port outputPort) {
allOutputPorts.put(outputPort.getIdentifier(), outputPort);
}
public void onOutputPortRemoved(final Port outputPort) {
allOutputPorts.remove(outputPort.getIdentifier());
}
public Port getOutputPort(final String id) {
return allOutputPorts.get(id);
}
public void onFunnelAdded(final Funnel funnel) {
allFunnels.put(funnel.getIdentifier(), funnel);
}
public void onFunnelRemoved(final Funnel funnel) {
allFunnels.remove(funnel.getIdentifier());
}
public Funnel getFunnel(final String id) {
return allFunnels.get(id);
}
/**
* Returns the status of all components in the controller. This request is
* not in the context of a user so the results will be unfiltered.
@ -3487,7 +3563,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public int getActiveThreadCount() {
return getGroupStatus(getRootGroupId()).getActiveThreadCount();
final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount();
return timerDrivenCount + eventDrivenCount;
}
private RepositoryStatusReport getProcessorStats() {

View File

@ -964,9 +964,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public boolean isValid() {
try {
final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final ValidationContext validationContext = getValidationContext();
final Collection<ValidationResult> validationResults = super.validate(validationContext);
for (final ValidationResult result : validationResults) {
@ -1011,8 +1009,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
// we are willing to make in order to save on validation costs that would be unnecessary most of the time.
if (getScheduledState() == ScheduledState.STOPPED) {
final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
final ValidationContext validationContext = getValidationContext();
final Collection<ValidationResult> validationResults = super.validate(validationContext);
@ -1111,6 +1108,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public synchronized void setProcessGroup(final ProcessGroup group) {
this.processGroup.set(group);
invalidateValidationContext();
}
@Override

View File

@ -208,6 +208,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
writeLock.lock();
try {
this.processGroup = group;
invalidateValidationContext();
} finally {
writeLock.unlock();
}

View File

@ -30,6 +30,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -79,6 +81,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final FlowController flowController;
private final NiFiProperties nifiProperties;
private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) {
@ -158,6 +162,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
serviceCache.put(id, serviceNode);
return serviceNode;
} catch (final Throwable t) {
throw new ControllerServiceInstantiationException(t);
@ -222,6 +228,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id,
new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, flowController, true);
serviceCache.putIfAbsent(id, serviceNode);
return serviceNode;
}
@ -459,12 +467,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
final ProcessGroup rootGroup = getRootGroup();
// Find the Process Group that owns the component.
ProcessGroup groupOfInterest = null;
final ProcessorNode procNode = rootGroup.findProcessor(componentId);
final ProcessorNode procNode = flowController.getProcessorNode(componentId);
if (procNode == null) {
final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
if (serviceNode == null) {
@ -523,7 +529,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return rootServiceNode;
}
return getRootGroup().findControllerService(serviceIdentifier);
return serviceCache.get(serviceIdentifier);
}
@Override
@ -533,16 +539,17 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
serviceNodes = flowController.getRootControllerServices();
} else {
ProcessGroup group = getRootGroup();
if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
if (FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) || group.getIdentifier().equals(groupId)) {
serviceNodes = new HashSet<>(serviceCache.values());
} else {
group = group.findProcessGroup(groupId);
}
if (group == null) {
return Collections.emptySet();
}
serviceNodes = group.getControllerServices(true);
}
}
final Set<String> identifiers = new HashSet<>();
for (final ControllerServiceNode serviceNode : serviceNodes) {
@ -570,13 +577,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
group.removeControllerService(serviceNode);
ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier());
serviceCache.remove(serviceNode.getIdentifier());
}
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
final Set<ControllerServiceNode> allServices = new HashSet<>();
allServices.addAll(flowController.getRootControllerServices());
allServices.addAll(getRootGroup().findAllControllerServices());
allServices.addAll(serviceCache.values());
return allServices;
}

View File

@ -405,6 +405,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
flowController.onInputPortAdded(port);
} finally {
writeLock.unlock();
}
@ -439,6 +440,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
}
flowController.onInputPortRemoved(port);
LOG.info("Input Port {} removed from flow", port);
} finally {
writeLock.unlock();
@ -484,6 +486,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
flowController.onOutputPortAdded(port);
} finally {
writeLock.unlock();
}
@ -509,6 +512,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group");
}
flowController.onOutputPortRemoved(port);
LOG.info("Output Port {} removed from flow", port);
} finally {
writeLock.unlock();
@ -545,6 +549,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
group.setParent(this);
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
flowController.onProcessGroupAdded(group);
} finally {
writeLock.unlock();
}
@ -584,6 +589,7 @@ public final class StandardProcessGroup implements ProcessGroup {
removeComponents(group);
processGroups.remove(group.getIdentifier());
flowController.onProcessGroupRemoved(group);
LOG.info("{} removed from flow", group);
} finally {
writeLock.unlock();
@ -704,6 +710,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setProcessGroup(this);
processors.put(processorId, processor);
flowController.onProcessorAdded(processor);
} finally {
writeLock.unlock();
}
@ -745,6 +752,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
processors.remove(id);
flowController.onProcessorRemoved(processor);
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
@ -884,6 +892,7 @@ public final class StandardProcessGroup implements ProcessGroup {
destination.addConnection(connection);
}
connections.put(connection.getIdentifier(), connection);
flowController.onConnectionAdded(connection);
} finally {
writeLock.unlock();
}
@ -943,6 +952,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// remove the connection from our map
connections.remove(connection.getIdentifier());
LOG.info("{} removed from flow", connection);
flowController.onConnectionRemoved(connection);
} finally {
writeLock.unlock();
}
@ -970,25 +980,21 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Connection findConnection(final String id) {
return findConnection(id, this);
final Connection connection = flowController.getConnection(id);
if (connection == null) {
return null;
}
private Connection findConnection(final String id, final ProcessGroup start) {
Connection connection = start.getConnection(id);
if (connection != null) {
// We found a Connection in the Controller, but we only want to return it if
// the Process Group is this or is a child of this.
if (isOwner(connection.getProcessGroup())) {
return connection;
}
for (final ProcessGroup group : start.getProcessGroups()) {
connection = findConnection(id, group);
if (connection != null) {
return connection;
}
}
return null;
}
@Override
public List<Connection> findAllConnections() {
return findAllConnections(this);
@ -1386,19 +1392,19 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ProcessGroup findProcessGroup(final String id) {
return findProcessGroup(requireNonNull(id), this);
if (requireNonNull(id).equals(getIdentifier())) {
return this;
}
private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) {
if (id.equals(start.getIdentifier())) {
return start;
final ProcessGroup group = flowController.getGroup(id);
if (group == null) {
return null;
}
for (final ProcessGroup group : start.getProcessGroups()) {
final ProcessGroup matching = findProcessGroup(id, group);
if (matching != null) {
return matching;
}
// We found a Processor in the Controller, but we only want to return it if
// the Process Group is this or is a child of this.
if (isOwner(group.getParent())) {
return group;
}
return null;
@ -1453,25 +1459,32 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ProcessorNode findProcessor(final String id) {
return findProcessor(id, this);
final ProcessorNode node = flowController.getProcessorNode(id);
if (node == null) {
return null;
}
private ProcessorNode findProcessor(final String id, final ProcessGroup start) {
ProcessorNode node = start.getProcessor(id);
if (node != null) {
// We found a Processor in the Controller, but we only want to return it if
// the Process Group is this or is a child of this.
if (isOwner(node.getProcessGroup())) {
return node;
}
for (final ProcessGroup group : start.getProcessGroups()) {
node = findProcessor(id, group);
if (node != null) {
return node;
}
}
return null;
}
private boolean isOwner(ProcessGroup owner) {
while (owner != this && owner != null) {
owner = owner.getParent();
}
if (owner == this) {
return true;
}
return false;
}
@Override
public List<ProcessorNode> findAllProcessors() {
return findAllProcessors(this);
@ -1521,6 +1534,7 @@ public final class StandardProcessGroup implements ProcessGroup {
return null;
}
@Override
public RemoteGroupPort findRemoteGroupPort(final String identifier) {
return findRemoteGroupPort(identifier, this);
}
@ -1584,7 +1598,16 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Port findInputPort(final String id) {
return findPort(id, this, new InputPortRetriever());
final Port port = flowController.getInputPort(id);
if (port == null) {
return null;
}
if (isOwner(port.getProcessGroup())) {
return port;
}
return null;
}
@Override
@ -1602,7 +1625,16 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Port findOutputPort(final String id) {
return findPort(id, this, new OutputPortRetriever());
final Port port = flowController.getOutputPort(id);
if (port == null) {
return null;
}
if (isOwner(port.getProcessGroup())) {
return port;
}
return null;
}
@Override
@ -1674,21 +1706,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) {
Port port = retriever.getPort(group, id);
if (port != null) {
return port;
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
port = findPort(id, childGroup, retriever);
if (port != null) {
return port;
}
}
return null;
}
private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
for (final Port port : retriever.getPorts(group)) {
@ -1716,6 +1733,7 @@ public final class StandardProcessGroup implements ProcessGroup {
funnel.setProcessGroup(this);
funnels.put(funnel.getIdentifier(), funnel);
flowController.onFunnelAdded(funnel);
if (autoStart) {
startFunnel(funnel);
@ -1737,25 +1755,19 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Funnel findFunnel(final String id) {
return findFunnel(id, this);
}
private Funnel findFunnel(final String id, final ProcessGroup start) {
Funnel funnel = start.getFunnel(id);
if (funnel != null) {
final Funnel funnel = flowController.getFunnel(id);
if (funnel == null) {
return funnel;
}
for (final ProcessGroup group : start.getProcessGroups()) {
funnel = findFunnel(id, group);
if (funnel != null) {
if (isOwner(funnel.getProcessGroup())) {
return funnel;
}
}
return null;
}
@Override
public ControllerServiceNode findControllerService(final String id) {
return findControllerService(id, this);
@ -1814,6 +1826,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
funnels.remove(funnel.getIdentifier());
flowController.onFunnelRemoved(funnel);
LOG.info("{} removed from flow", funnel);
} finally {
writeLock.unlock();

View File

@ -16,7 +16,27 @@
*/
package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -61,29 +81,13 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestStandardProcessScheduler {
@ -121,7 +125,26 @@ public class TestStandardProcessScheduler {
taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, reloadComponent);
controller = Mockito.mock(FlowController.class);
rootGroup = new MockProcessGroup();
final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
Mockito.doAnswer(new Answer<ProcessorNode>() {
@Override
public ProcessorNode answer(InvocationOnMock invocation) throws Throwable {
final String id = invocation.getArgumentAt(0, String.class);
return processorMap.get(id);
}
}).when(controller).getProcessorNode(Mockito.anyString());
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class);
processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
return null;
}
}).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
rootGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup);
}

View File

@ -42,9 +42,12 @@ import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.beans.PropertyDescriptor;
import java.util.Arrays;
@ -55,6 +58,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -88,6 +93,7 @@ public class TestStandardControllerServiceProvider {
private static VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
private static NiFiProperties niFiProperties;
private static Bundle systemBundle;
private FlowController controller;
@BeforeClass
public static void setNiFiProps() {
@ -99,6 +105,29 @@ public class TestStandardControllerServiceProvider {
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
}
@Before
public void setup() {
controller = Mockito.mock(FlowController.class);
final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
Mockito.doAnswer(new Answer<ProcessorNode>() {
@Override
public ProcessorNode answer(InvocationOnMock invocation) throws Throwable {
final String id = invocation.getArgumentAt(0, String.class);
return processorMap.get(id);
}
}).when(controller).getProcessorNode(Mockito.anyString());
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class);
processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
return null;
}
}).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
}
private StandardProcessScheduler createScheduler() {
return new StandardProcessScheduler(null, null, stateManagerProvider, variableRegistry, niFiProperties);
}
@ -111,7 +140,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testDisableControllerService() {
final ProcessGroup procGroup = new MockProcessGroup();
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@ -127,7 +156,7 @@ public class TestStandardControllerServiceProvider {
@Test(timeout = 10000)
public void testEnableDisableWithReference() {
final ProcessGroup group = new MockProcessGroup();
final ProcessGroup group = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
@ -190,7 +219,7 @@ public class TestStandardControllerServiceProvider {
}
public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) {
final ProcessGroup procGroup = new MockProcessGroup();
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@ -246,7 +275,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testOrderingOfServices() {
final ProcessGroup procGroup = new MockProcessGroup();
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@ -405,7 +434,7 @@ public class TestStandardControllerServiceProvider {
new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties,
VariableRegistry.EMPTY_REGISTRY, reloadComponent);
final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, null, variableRegistry);
final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, Mockito.mock(FlowController.class), variableRegistry);
group.addProcessor(procNode);
procNode.setProcessGroup(group);
@ -414,7 +443,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testEnableReferencingComponents() {
final ProcessGroup procGroup = new MockProcessGroup();
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@ -442,7 +471,7 @@ public class TestStandardControllerServiceProvider {
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider =
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
ProcessGroup procGroup = new MockProcessGroup();
ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode A = provider.createControllerService(ServiceA.class.getName(), "A",
@ -493,7 +522,7 @@ public class TestStandardControllerServiceProvider {
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
stateManagerProvider, variableRegistry, niFiProperties);
ProcessGroup procGroup = new MockProcessGroup();
ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode A = provider.createControllerService(ServiceC.class.getName(), "A",
@ -535,7 +564,7 @@ public class TestStandardControllerServiceProvider {
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider =
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
ProcessGroup procGroup = new MockProcessGroup();
ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",

View File

@ -17,6 +17,13 @@
package org.apache.nifi.controller.service.mock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.connectable.Connectable;
@ -25,6 +32,7 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
@ -35,16 +43,14 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MockProcessGroup implements ProcessGroup {
private final Map<String, ControllerServiceNode> serviceMap = new HashMap<>();
private final Map<String, ProcessorNode> processorMap = new HashMap<>();
private final FlowController flowController;
public MockProcessGroup(final FlowController flowController) {
this.flowController = flowController;
}
@Override
public Authorizable getParentAuthorizable() {
@ -260,11 +266,13 @@ public class MockProcessGroup implements ProcessGroup {
public void addProcessor(final ProcessorNode processor) {
processor.setProcessGroup(this);
processorMap.put(processor.getIdentifier(), processor);
flowController.onProcessorAdded(processor);
}
@Override
public void removeProcessor(final ProcessorNode processor) {
processorMap.remove(processor.getIdentifier());
flowController.onProcessorRemoved(processor);
}
@Override

View File

@ -35,6 +35,15 @@ import org.apache.nifi.update.attributes.Rule;
*
*/
public class CriteriaSerDe {
private static final JAXBContext JAXB_CONTEXT;
static {
try {
JAXB_CONTEXT = JAXBContext.newInstance(CriteriaBinding.class);
} catch (JAXBException e) {
throw new RuntimeException("Could not create JAXB Context for UpdateAttribute", e);
}
}
/**
* Handles the Criteria binding during the (de)serialization process. This
@ -86,8 +95,7 @@ public class CriteriaSerDe {
binding.setRules(criteria.getRules());
// serialize the binding
final JAXBContext context = JAXBContext.newInstance(CriteriaBinding.class);
final Marshaller marshaller = context.createMarshaller();
final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
marshaller.marshal(binding, writer);
@ -110,8 +118,7 @@ public class CriteriaSerDe {
if (string != null && !string.trim().equals("")) {
try {
// deserialize the binding
final JAXBContext context = JAXBContext.newInstance(CriteriaBinding.class);
final Unmarshaller unmarshaller = context.createUnmarshaller();
final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
final Source source = new StreamSource(new StringReader(string));
final JAXBElement<CriteriaBinding> element = unmarshaller.unmarshal(source, CriteriaBinding.class);