From ba56774fa1c16d935f592df482e74bfa1564b5a3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 5 Jul 2017 11:24:01 -0400 Subject: [PATCH] NIFI-4151: Updated UpdateAttribute to only create JAXB Context once; Minor performance tweaks to standard validators and StatusMerge.prettyPrint; updated AbstractConfiguredComponent to not create a new ValidationContext each time that validate is called but only when needed; updated FlowController, StandardControllerServiceProvider, and StandardProcessGroup so that component lookups can be performed using a ConcurrentMap at FlowController level instead of having to perform a depth-first search through all ProcessGroups when calling findProcessor(), findProcessGroup(), findXYZ() This closes #1979 --- .../processor/util/StandardValidators.java | 13 +- .../nifi/cluster/manager/StatusMerger.java | 28 +++- .../AbstractConfiguredComponent.java | 34 ++++- .../nifi/controller/FlowController.java | 92 +++++++++++- .../controller/StandardProcessorNode.java | 8 +- .../StandardControllerServiceNode.java | 1 + .../StandardControllerServiceProvider.java | 30 ++-- .../nifi/groups/StandardProcessGroup.java | 137 ++++++++++-------- .../TestStandardProcessScheduler.java | 63 +++++--- ...TestStandardControllerServiceProvider.java | 47 ++++-- .../service/mock/MockProcessGroup.java | 22 ++- .../attributes/serde/CriteriaSerDe.java | 15 +- 12 files changed, 353 insertions(+), 137 deletions(-) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index d0170b82c7..a5963304d8 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -391,6 +391,8 @@ public class StandardValidators { } public static final Validator TIME_PERIOD_VALIDATOR = new Validator() { + private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX); + @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { @@ -400,7 +402,7 @@ public class StandardValidators { if (input == null) { return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build(); } - if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) { + if (TIME_DURATION_PATTERN.matcher(input.toLowerCase()).matches()) { return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); } else { return new ValidationResult.Builder() @@ -416,6 +418,8 @@ public class StandardValidators { }; public static final Validator DATA_SIZE_VALIDATOR = new Validator() { + private final Pattern DATA_SIZE_PATTERN = Pattern.compile(DataUnit.DATA_SIZE_REGEX); + @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { @@ -430,7 +434,7 @@ public class StandardValidators { .explanation("Data Size cannot be null") .build(); } - if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) { + if (DATA_SIZE_PATTERN.matcher(input.toUpperCase()).matches()) { return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); } else { return new ValidationResult.Builder() @@ -706,8 +710,7 @@ public class StandardValidators { // // static class TimePeriodValidator implements Validator { - - private final Pattern pattern; + private static final Pattern pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX); private final long minNanos; private final long maxNanos; @@ -716,8 +719,6 @@ public class StandardValidators { private final String maxValueEnglish; public TimePeriodValidator(final long minValue, final TimeUnit minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) { - pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX); - this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, minTimeUnit); this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, maxTimeUnit); this.minValueEnglish = minValue + " " + minTimeUnit.toString(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java index a876d51b76..9cceaf7c08 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -61,6 +61,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; public class StatusMerger { + private static final String ZERO_COUNT = "0"; + private static final String ZERO_BYTES = "0 bytes"; + private static final String ZERO_COUNT_AND_BYTES = "0 (0 bytes)"; + private static final String EMPTY_COUNT = "-"; + private static final String EMPTY_BYTES = "-"; + public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) { if (target == null || toMerge == null) { return; @@ -743,14 +749,32 @@ public class StatusMerger { } public static String formatCount(final Integer intStatus) { - return intStatus == null ? "-" : FormatUtils.formatCount(intStatus); + if (intStatus == null) { + return EMPTY_COUNT; + } + if (intStatus == 0) { + return ZERO_COUNT; + } + + return FormatUtils.formatCount(intStatus); } public static String formatDataSize(final Long longStatus) { - return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus); + if (longStatus == null) { + return EMPTY_BYTES; + } + if (longStatus == 0L) { + return ZERO_BYTES; + } + + return FormatUtils.formatDataSize(longStatus); } public static String prettyPrint(final Integer count, final Long bytes) { + if (count != null && bytes != null && count == 0 && bytes == 0L) { + return ZERO_COUNT_AND_BYTES; + } + return formatCount(count) + " (" + formatDataSize(bytes) + ")"; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 76c5d7a048..2aa84f8b84 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -61,6 +61,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final ControllerServiceProvider serviceProvider; private final AtomicReference name; private final AtomicReference annotationData = new AtomicReference<>(); + private final AtomicReference validationContext = new AtomicReference<>(); private final String componentType; private final String componentCanonicalClass; private final VariableRegistry variableRegistry; @@ -118,6 +119,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public void setAnnotationData(final String data) { + invalidateValidationContext(); annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data)); } @@ -435,6 +437,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + invalidateValidationContext(); try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { getComponent().onPropertyModified(descriptor, oldValue, newValue); } @@ -449,8 +452,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public boolean isValid() { - final Collection validationResults = validate(validationContextFactory.newValidationContext( - getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier())); + final Collection validationResults = validate(getValidationContext()); for (final ValidationResult result : validationResults) { if (!result.isValid()) { @@ -470,8 +472,13 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone final List results = new ArrayList<>(); lock.lock(); try { - final ValidationContext validationContext = validationContextFactory.newValidationContext( - serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); + final ValidationContext validationContext; + if (serviceIdentifiersNotToValidate == null || serviceIdentifiersNotToValidate.isEmpty()) { + validationContext = getValidationContext(); + } else { + validationContext = getValidationContextFactory().newValidationContext(serviceIdentifiersNotToValidate, + getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); + } final Collection validationResults; try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { @@ -515,6 +522,25 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone return this.validationContextFactory; } + protected void invalidateValidationContext() { + this.validationContext.set(null); + } + + protected ValidationContext getValidationContext() { + while (true) { + ValidationContext context = this.validationContext.get(); + if (context != null) { + return context; + } + + context = getValidationContextFactory().newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); + final boolean updated = validationContext.compareAndSet(null, context); + if (updated) { + return context; + } + } + } + protected VariableRegistry getVariableRegistry() { return this.variableRegistry; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index aef6d46d40..9438946892 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -18,7 +18,6 @@ package org.apache.nifi.controller; import static java.util.Objects.requireNonNull; -import com.sun.jersey.api.client.ClientHandlerException; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -47,7 +46,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; + import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -248,6 +249,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.jersey.api.client.ClientHandlerException; + public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent { @@ -301,6 +304,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final VariableRegistry variableRegistry; private final ConcurrentMap rootControllerServices = new ConcurrentHashMap<>(); + private final ConcurrentMap allProcessors = new ConcurrentHashMap<>(); + private final ConcurrentMap allProcessGroups = new ConcurrentHashMap<>(); + private final ConcurrentMap allConnections = new ConcurrentHashMap<>(); + private final ConcurrentMap allInputPorts = new ConcurrentHashMap<>(); + private final ConcurrentMap allOutputPorts = new ConcurrentHashMap<>(); + private final ConcurrentMap allFunnels = new ConcurrentHashMap<>(); + private volatile ZooKeeperStateServer zooKeeperStateServer; // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may @@ -532,7 +542,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, nifiProperties, encryptor, this, this.variableRegistry); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); - rootGroupRef.set(rootGroup); + setRootGroup(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry, this.nifiProperties); @@ -1660,6 +1670,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // update the heartbeat bean this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary())); + allProcessGroups.put(group.getIdentifier(), group); + allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group); } finally { writeLock.unlock(); } @@ -2364,12 +2376,76 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @return the process group or null if not group is found */ public ProcessGroup getGroup(final String id) { - requireNonNull(id); - final ProcessGroup root = getRootGroup(); - final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; - return root == null ? null : root.findProcessGroup(searchId); + return allProcessGroups.get(requireNonNull(id)); } + public void onProcessGroupAdded(final ProcessGroup group) { + allProcessGroups.put(group.getIdentifier(), group); + } + + public void onProcessGroupRemoved(final ProcessGroup group) { + allProcessGroups.remove(group.getIdentifier()); + } + + public void onProcessorAdded(final ProcessorNode procNode) { + allProcessors.put(procNode.getIdentifier(), procNode); + } + + public void onProcessorRemoved(final ProcessorNode procNode) { + allProcessors.remove(procNode.getIdentifier()); + } + + public ProcessorNode getProcessorNode(final String id) { + return allProcessors.get(id); + } + + public void onConnectionAdded(final Connection connection) { + allConnections.put(connection.getIdentifier(), connection); + } + + public void onConnectionRemoved(final Connection connection) { + allConnections.remove(connection.getIdentifier()); + } + + public Connection getConnection(final String id) { + return allConnections.get(id); + } + + public void onInputPortAdded(final Port inputPort) { + allInputPorts.put(inputPort.getIdentifier(), inputPort); + } + + public void onInputPortRemoved(final Port inputPort) { + allInputPorts.remove(inputPort.getIdentifier()); + } + + public Port getInputPort(final String id) { + return allInputPorts.get(id); + } + + public void onOutputPortAdded(final Port outputPort) { + allOutputPorts.put(outputPort.getIdentifier(), outputPort); + } + + public void onOutputPortRemoved(final Port outputPort) { + allOutputPorts.remove(outputPort.getIdentifier()); + } + + public Port getOutputPort(final String id) { + return allOutputPorts.get(id); + } + + public void onFunnelAdded(final Funnel funnel) { + allFunnels.put(funnel.getIdentifier(), funnel); + } + + public void onFunnelRemoved(final Funnel funnel) { + allFunnels.remove(funnel.getIdentifier()); + } + + public Funnel getFunnel(final String id) { + return allFunnels.get(id); + } /** * Returns the status of all components in the controller. This request is * not in the context of a user so the results will be unfiltered. @@ -3487,7 +3563,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public int getActiveThreadCount() { - return getGroupStatus(getRootGroupId()).getActiveThreadCount(); + final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount(); + final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount(); + return timerDrivenCount + eventDrivenCount; } private RepositoryStatusReport getProcessorStats() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 6d96a5c24d..1ff09d795e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -964,9 +964,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isValid() { try { - final ValidationContext validationContext = this.getValidationContextFactory() - .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); - + final ValidationContext validationContext = getValidationContext(); final Collection validationResults = super.validate(validationContext); for (final ValidationResult result : validationResults) { @@ -1011,8 +1009,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off // we are willing to make in order to save on validation costs that would be unnecessary most of the time. if (getScheduledState() == ScheduledState.STOPPED) { - final ValidationContext validationContext = this.getValidationContextFactory() - .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier()); + final ValidationContext validationContext = getValidationContext(); final Collection validationResults = super.validate(validationContext); @@ -1111,6 +1108,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public synchronized void setProcessGroup(final ProcessGroup group) { this.processGroup.set(group); + invalidateValidationContext(); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index e399e7ffbb..fa4ab84928 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -208,6 +208,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i writeLock.lock(); try { this.processGroup = group; + invalidateValidationContext(); } finally { writeLock.unlock(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 2d006cc55f..401111322d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -30,6 +30,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -79,6 +81,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final FlowController flowController; private final NiFiProperties nifiProperties; + private final ConcurrentMap serviceCache = new ConcurrentHashMap<>(); + public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) { @@ -158,6 +162,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + serviceCache.put(id, serviceNode); + return serviceNode; } catch (final Throwable t) { throw new ControllerServiceInstantiationException(t); @@ -222,6 +228,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id, new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, flowController, true); + + serviceCache.putIfAbsent(id, serviceNode); return serviceNode; } @@ -459,12 +467,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) { - final ProcessGroup rootGroup = getRootGroup(); - // Find the Process Group that owns the component. ProcessGroup groupOfInterest = null; - final ProcessorNode procNode = rootGroup.findProcessor(componentId); + final ProcessorNode procNode = flowController.getProcessorNode(componentId); if (procNode == null) { final ControllerServiceNode serviceNode = getControllerServiceNode(componentId); if (serviceNode == null) { @@ -523,7 +529,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return rootServiceNode; } - return getRootGroup().findControllerService(serviceIdentifier); + return serviceCache.get(serviceIdentifier); } @Override @@ -533,15 +539,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi serviceNodes = flowController.getRootControllerServices(); } else { ProcessGroup group = getRootGroup(); - if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) { + if (FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) || group.getIdentifier().equals(groupId)) { + serviceNodes = new HashSet<>(serviceCache.values()); + } else { group = group.findProcessGroup(groupId); - } + if (group == null) { + return Collections.emptySet(); + } - if (group == null) { - return Collections.emptySet(); + serviceNodes = group.getControllerServices(true); } - - serviceNodes = group.getControllerServices(true); } final Set identifiers = new HashSet<>(); @@ -570,13 +577,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi group.removeControllerService(serviceNode); ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier()); + serviceCache.remove(serviceNode.getIdentifier()); } @Override public Set getAllControllerServices() { final Set allServices = new HashSet<>(); allServices.addAll(flowController.getRootControllerServices()); - allServices.addAll(getRootGroup().findAllControllerServices()); + allServices.addAll(serviceCache.values()); return allServices; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 452f3cc491..2907704e19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -405,6 +405,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.setProcessGroup(this); inputPorts.put(requireNonNull(port).getIdentifier(), port); + flowController.onInputPortAdded(port); } finally { writeLock.unlock(); } @@ -439,6 +440,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group"); } + flowController.onInputPortRemoved(port); LOG.info("Input Port {} removed from flow", port); } finally { writeLock.unlock(); @@ -484,6 +486,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.setProcessGroup(this); outputPorts.put(port.getIdentifier(), port); + flowController.onOutputPortAdded(port); } finally { writeLock.unlock(); } @@ -509,6 +512,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group"); } + flowController.onOutputPortRemoved(port); LOG.info("Output Port {} removed from flow", port); } finally { writeLock.unlock(); @@ -545,6 +549,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { group.setParent(this); processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); + flowController.onProcessGroupAdded(group); } finally { writeLock.unlock(); } @@ -584,6 +589,7 @@ public final class StandardProcessGroup implements ProcessGroup { removeComponents(group); processGroups.remove(group.getIdentifier()); + flowController.onProcessGroupRemoved(group); LOG.info("{} removed from flow", group); } finally { writeLock.unlock(); @@ -704,6 +710,7 @@ public final class StandardProcessGroup implements ProcessGroup { processor.setProcessGroup(this); processors.put(processorId, processor); + flowController.onProcessorAdded(processor); } finally { writeLock.unlock(); } @@ -745,6 +752,7 @@ public final class StandardProcessGroup implements ProcessGroup { } processors.remove(id); + flowController.onProcessorRemoved(processor); LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider(); @@ -884,6 +892,7 @@ public final class StandardProcessGroup implements ProcessGroup { destination.addConnection(connection); } connections.put(connection.getIdentifier(), connection); + flowController.onConnectionAdded(connection); } finally { writeLock.unlock(); } @@ -943,6 +952,7 @@ public final class StandardProcessGroup implements ProcessGroup { // remove the connection from our map connections.remove(connection.getIdentifier()); LOG.info("{} removed from flow", connection); + flowController.onConnectionRemoved(connection); } finally { writeLock.unlock(); } @@ -970,25 +980,21 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public Connection findConnection(final String id) { - return findConnection(id, this); - } - - private Connection findConnection(final String id, final ProcessGroup start) { - Connection connection = start.getConnection(id); - if (connection != null) { - return connection; + final Connection connection = flowController.getConnection(id); + if (connection == null) { + return null; } - for (final ProcessGroup group : start.getProcessGroups()) { - connection = findConnection(id, group); - if (connection != null) { - return connection; - } + // We found a Connection in the Controller, but we only want to return it if + // the Process Group is this or is a child of this. + if (isOwner(connection.getProcessGroup())) { + return connection; } return null; } + @Override public List findAllConnections() { return findAllConnections(this); @@ -1386,19 +1392,19 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public ProcessGroup findProcessGroup(final String id) { - return findProcessGroup(requireNonNull(id), this); - } - - private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) { - if (id.equals(start.getIdentifier())) { - return start; + if (requireNonNull(id).equals(getIdentifier())) { + return this; } - for (final ProcessGroup group : start.getProcessGroups()) { - final ProcessGroup matching = findProcessGroup(id, group); - if (matching != null) { - return matching; - } + final ProcessGroup group = flowController.getGroup(id); + if (group == null) { + return null; + } + + // We found a Processor in the Controller, but we only want to return it if + // the Process Group is this or is a child of this. + if (isOwner(group.getParent())) { + return group; } return null; @@ -1453,23 +1459,30 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public ProcessorNode findProcessor(final String id) { - return findProcessor(id, this); - } + final ProcessorNode node = flowController.getProcessorNode(id); + if (node == null) { + return null; + } - private ProcessorNode findProcessor(final String id, final ProcessGroup start) { - ProcessorNode node = start.getProcessor(id); - if (node != null) { + // We found a Processor in the Controller, but we only want to return it if + // the Process Group is this or is a child of this. + if (isOwner(node.getProcessGroup())) { return node; } - for (final ProcessGroup group : start.getProcessGroups()) { - node = findProcessor(id, group); - if (node != null) { - return node; - } + return null; + } + + private boolean isOwner(ProcessGroup owner) { + while (owner != this && owner != null) { + owner = owner.getParent(); } - return null; + if (owner == this) { + return true; + } + + return false; } @Override @@ -1521,6 +1534,7 @@ public final class StandardProcessGroup implements ProcessGroup { return null; } + @Override public RemoteGroupPort findRemoteGroupPort(final String identifier) { return findRemoteGroupPort(identifier, this); } @@ -1584,7 +1598,16 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public Port findInputPort(final String id) { - return findPort(id, this, new InputPortRetriever()); + final Port port = flowController.getInputPort(id); + if (port == null) { + return null; + } + + if (isOwner(port.getProcessGroup())) { + return port; + } + + return null; } @Override @@ -1602,7 +1625,16 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public Port findOutputPort(final String id) { - return findPort(id, this, new OutputPortRetriever()); + final Port port = flowController.getOutputPort(id); + if (port == null) { + return null; + } + + if (isOwner(port.getProcessGroup())) { + return port; + } + + return null; } @Override @@ -1674,21 +1706,6 @@ public final class StandardProcessGroup implements ProcessGroup { } } - private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) { - Port port = retriever.getPort(group, id); - if (port != null) { - return port; - } - - for (final ProcessGroup childGroup : group.getProcessGroups()) { - port = findPort(id, childGroup, retriever); - if (port != null) { - return port; - } - } - - return null; - } private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) { for (final Port port : retriever.getPorts(group)) { @@ -1716,6 +1733,7 @@ public final class StandardProcessGroup implements ProcessGroup { funnel.setProcessGroup(this); funnels.put(funnel.getIdentifier(), funnel); + flowController.onFunnelAdded(funnel); if (autoStart) { startFunnel(funnel); @@ -1737,25 +1755,19 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public Funnel findFunnel(final String id) { - return findFunnel(id, this); - } - - private Funnel findFunnel(final String id, final ProcessGroup start) { - Funnel funnel = start.getFunnel(id); - if (funnel != null) { + final Funnel funnel = flowController.getFunnel(id); + if (funnel == null) { return funnel; } - for (final ProcessGroup group : start.getProcessGroups()) { - funnel = findFunnel(id, group); - if (funnel != null) { - return funnel; - } + if (isOwner(funnel.getProcessGroup())) { + return funnel; } return null; } + @Override public ControllerServiceNode findControllerService(final String id) { return findControllerService(id, this); @@ -1814,6 +1826,7 @@ public final class StandardProcessGroup implements ProcessGroup { } funnels.remove(funnel.getIdentifier()); + flowController.onFunnelRemoved(funnel); LOG.info("{} removed from flow", funnel); } finally { writeLock.unlock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index b024e52965..dfd627fc0f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -16,7 +16,27 @@ */ package org.apache.nifi.controller.scheduling; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.FileUtils; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -61,29 +81,13 @@ import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; +import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.FileUtils; -import org.junit.After; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestStandardProcessScheduler { @@ -121,7 +125,26 @@ public class TestStandardProcessScheduler { taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, reloadComponent); controller = Mockito.mock(FlowController.class); - rootGroup = new MockProcessGroup(); + + final ConcurrentMap processorMap = new ConcurrentHashMap<>(); + Mockito.doAnswer(new Answer() { + @Override + public ProcessorNode answer(InvocationOnMock invocation) throws Throwable { + final String id = invocation.getArgumentAt(0, String.class); + return processorMap.get(id); + } + }).when(controller).getProcessorNode(Mockito.anyString()); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class); + processorMap.putIfAbsent(procNode.getIdentifier(), procNode); + return null; + } + }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class)); + + rootGroup = new MockProcessGroup(controller); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 3a28cb004a..e82085e2df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -42,9 +42,12 @@ import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.beans.PropertyDescriptor; import java.util.Arrays; @@ -55,6 +58,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -88,6 +93,7 @@ public class TestStandardControllerServiceProvider { private static VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY; private static NiFiProperties niFiProperties; private static Bundle systemBundle; + private FlowController controller; @BeforeClass public static void setNiFiProps() { @@ -99,6 +105,29 @@ public class TestStandardControllerServiceProvider { ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); } + @Before + public void setup() { + controller = Mockito.mock(FlowController.class); + + final ConcurrentMap processorMap = new ConcurrentHashMap<>(); + Mockito.doAnswer(new Answer() { + @Override + public ProcessorNode answer(InvocationOnMock invocation) throws Throwable { + final String id = invocation.getArgumentAt(0, String.class); + return processorMap.get(id); + } + }).when(controller).getProcessorNode(Mockito.anyString()); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class); + processorMap.putIfAbsent(procNode.getIdentifier(), procNode); + return null; + } + }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class)); + } + private StandardProcessScheduler createScheduler() { return new StandardProcessScheduler(null, null, stateManagerProvider, variableRegistry, niFiProperties); } @@ -111,7 +140,7 @@ public class TestStandardControllerServiceProvider { @Test public void testDisableControllerService() { - final ProcessGroup procGroup = new MockProcessGroup(); + final ProcessGroup procGroup = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); @@ -127,7 +156,7 @@ public class TestStandardControllerServiceProvider { @Test(timeout = 10000) public void testEnableDisableWithReference() { - final ProcessGroup group = new MockProcessGroup(); + final ProcessGroup group = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group); @@ -190,7 +219,7 @@ public class TestStandardControllerServiceProvider { } public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) { - final ProcessGroup procGroup = new MockProcessGroup(); + final ProcessGroup procGroup = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); @@ -246,7 +275,7 @@ public class TestStandardControllerServiceProvider { @Test public void testOrderingOfServices() { - final ProcessGroup procGroup = new MockProcessGroup(); + final ProcessGroup procGroup = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); @@ -405,7 +434,7 @@ public class TestStandardControllerServiceProvider { new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties, VariableRegistry.EMPTY_REGISTRY, reloadComponent); - final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, null, variableRegistry); + final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, Mockito.mock(FlowController.class), variableRegistry); group.addProcessor(procNode); procNode.setProcessGroup(group); @@ -414,7 +443,7 @@ public class TestStandardControllerServiceProvider { @Test public void testEnableReferencingComponents() { - final ProcessGroup procGroup = new MockProcessGroup(); + final ProcessGroup procGroup = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); @@ -442,7 +471,7 @@ public class TestStandardControllerServiceProvider { FlowController controller = Mockito.mock(FlowController.class); StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); - ProcessGroup procGroup = new MockProcessGroup(); + ProcessGroup procGroup = new MockProcessGroup(controller); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); ControllerServiceNode A = provider.createControllerService(ServiceA.class.getName(), "A", @@ -493,7 +522,7 @@ public class TestStandardControllerServiceProvider { FlowController controller = Mockito.mock(FlowController.class); StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); - ProcessGroup procGroup = new MockProcessGroup(); + ProcessGroup procGroup = new MockProcessGroup(controller); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); ControllerServiceNode A = provider.createControllerService(ServiceC.class.getName(), "A", @@ -535,7 +564,7 @@ public class TestStandardControllerServiceProvider { FlowController controller = Mockito.mock(FlowController.class); StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); - ProcessGroup procGroup = new MockProcessGroup(); + ProcessGroup procGroup = new MockProcessGroup(controller); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 838c53cbe1..b6f70a12dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -17,6 +17,13 @@ package org.apache.nifi.controller.service.mock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.connectable.Connectable; @@ -25,6 +32,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Positionable; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; @@ -35,16 +43,14 @@ import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - public class MockProcessGroup implements ProcessGroup { private final Map serviceMap = new HashMap<>(); private final Map processorMap = new HashMap<>(); + private final FlowController flowController; + + public MockProcessGroup(final FlowController flowController) { + this.flowController = flowController; + } @Override public Authorizable getParentAuthorizable() { @@ -260,11 +266,13 @@ public class MockProcessGroup implements ProcessGroup { public void addProcessor(final ProcessorNode processor) { processor.setProcessGroup(this); processorMap.put(processor.getIdentifier(), processor); + flowController.onProcessorAdded(processor); } @Override public void removeProcessor(final ProcessorNode processor) { processorMap.remove(processor.getIdentifier()); + flowController.onProcessorRemoved(processor); } @Override diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java index f1cd126e25..0ad19cedbe 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java @@ -35,6 +35,15 @@ import org.apache.nifi.update.attributes.Rule; * */ public class CriteriaSerDe { + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(CriteriaBinding.class); + } catch (JAXBException e) { + throw new RuntimeException("Could not create JAXB Context for UpdateAttribute", e); + } + } /** * Handles the Criteria binding during the (de)serialization process. This @@ -86,8 +95,7 @@ public class CriteriaSerDe { binding.setRules(criteria.getRules()); // serialize the binding - final JAXBContext context = JAXBContext.newInstance(CriteriaBinding.class); - final Marshaller marshaller = context.createMarshaller(); + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); marshaller.marshal(binding, writer); @@ -110,8 +118,7 @@ public class CriteriaSerDe { if (string != null && !string.trim().equals("")) { try { // deserialize the binding - final JAXBContext context = JAXBContext.newInstance(CriteriaBinding.class); - final Unmarshaller unmarshaller = context.createUnmarshaller(); + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); final Source source = new StreamSource(new StringReader(string)); final JAXBElement element = unmarshaller.unmarshal(source, CriteriaBinding.class);