diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 5e239b25db..2000e36c28 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -174,7 +174,7 @@ public class Node {
flowController.getStateManagerProvider().getStateManager("Cluster Node Configuration").setState(Collections.singletonMap("Node UUID", nodeId.getId()), Scope.LOCAL);
flowService = StandardFlowService.createClusteredInstance(flowController, nodeProperties, senderListener, clusterCoordinator,
- PropertyEncryptorFactory.getPropertyEncryptor(nodeProperties), revisionManager, Mockito.mock(Authorizer.class));
+ revisionManager, Mockito.mock(Authorizer.class));
flowService.start();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ec5e438dc6..32616e8c8e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -544,7 +544,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
throw new RuntimeException(e);
}
- processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties);
+ processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
parameterContextManager = new StandardParameterContextManager();
@@ -559,8 +559,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
- final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
- final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor, this.nifiProperties);
+ final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
+ final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
@@ -1496,7 +1496,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
* Synchronizes this controller with the proposed flow.
*
* For more details, see
- * {@link FlowSynchronizer#sync(FlowController, DataFlow, PropertyEncryptor, FlowService, BundleUpdateStrategy)}.
+ * {@link FlowSynchronizer#sync(FlowController, DataFlow, FlowService, BundleUpdateStrategy)}.
*
* @param synchronizer synchronizer
* @param dataFlow the flow to load the controller with. If the flow is null
@@ -1521,7 +1521,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
LOG.debug("Synchronizing controller with proposed flow");
try {
- synchronizer.sync(this, dataFlow, encryptor, flowService, bundleUpdateStrategy);
+ synchronizer.sync(this, dataFlow, flowService, bundleUpdateStrategy);
} catch (final UninheritableFlowException ufe) {
final NodeIdentifier localNodeId = getNodeId();
if (localNodeId != null) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index c3eb799f5e..16db9a07f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -54,7 +54,6 @@ import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.StandardFlowSynchronizer;
import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.BundleUpdateStrategy;
@@ -157,12 +156,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
public static StandardFlowService createStandaloneInstance(
final FlowController controller,
final NiFiProperties nifiProperties,
- final PropertyEncryptor encryptor,
final RevisionManager revisionManager,
final Authorizer authorizer,
final FlowSerializationStrategy serializationStrategy) throws IOException {
- return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer,
+ return new StandardFlowService(controller, nifiProperties, null, false, null, revisionManager, authorizer,
serializationStrategy);
}
@@ -171,11 +169,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final NiFiProperties nifiProperties,
final NodeProtocolSenderListener senderListener,
final ClusterCoordinator coordinator,
- final PropertyEncryptor encryptor,
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
- return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer,
+ return new StandardFlowService(controller, nifiProperties, senderListener, true, coordinator, revisionManager, authorizer,
FlowSerializationStrategy.WRITE_XML_AND_JSON);
}
@@ -183,7 +180,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final FlowController controller,
final NiFiProperties nifiProperties,
final NodeProtocolSenderListener senderListener,
- final PropertyEncryptor encryptor,
final boolean configuredForClustering,
final ClusterCoordinator clusterCoordinator,
final RevisionManager revisionManager,
@@ -193,10 +189,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.nifiProperties = nifiProperties;
this.controller = controller;
+
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
autoResumeState = nifiProperties.getAutoResumeState();
- dao = new StandardFlowConfigurationDAO(encryptor, nifiProperties, controller.getExtensionManager(), serializationStrategy);
+ dao = new StandardFlowConfigurationDAO(nifiProperties, controller.getExtensionManager(), serializationStrategy);
this.clusterCoordinator = clusterCoordinator;
if (clusterCoordinator != null) {
clusterCoordinator.setFlowService(this);
@@ -236,7 +233,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.configuredForClustering = false;
this.senderListener = null;
}
-
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
index 63d2b63a7c..86b5323baa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/XmlFlowSynchronizer.java
@@ -141,21 +141,18 @@ import java.util.zip.GZIPInputStream;
public class XmlFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger = LoggerFactory.getLogger(XmlFlowSynchronizer.class);
- private final PropertyEncryptor encryptor;
private final boolean autoResumeState;
private final NiFiProperties nifiProperties;
private final ExtensionManager extensionManager;
- public XmlFlowSynchronizer(final PropertyEncryptor encryptor, final NiFiProperties nifiProperties, final ExtensionManager extensionManager) {
- this.encryptor = encryptor;
+ public XmlFlowSynchronizer(final NiFiProperties nifiProperties, final ExtensionManager extensionManager) {
this.autoResumeState = nifiProperties.getAutoResumeState();
this.nifiProperties = nifiProperties;
this.extensionManager = extensionManager;
}
-
@Override
- public void sync(final FlowController controller, final DataFlow proposedFlow, final PropertyEncryptor encryptor, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
+ public void sync(final FlowController controller, final DataFlow proposedFlow, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
final FlowManager flowManager = controller.getFlowManager();
@@ -378,6 +375,8 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
final boolean flowAlreadySynchronized = controller.isFlowSynchronized();
final FlowManager flowManager = controller.getFlowManager();
+ final PropertyEncryptor encryptor = controller.getEncryptor();
+
// get the root element
final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement);
@@ -424,10 +423,10 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
});
logger.trace("Adding root process group");
- rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
+ rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encodingVersion);
} else {
logger.trace("Updating root process group");
- rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
+ rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encodingVersion);
}
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
@@ -825,11 +824,13 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
}
private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
- final PropertyEncryptor encryptor, final FlowEncodingVersion encodingVersion) {
+ final FlowEncodingVersion encodingVersion) {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
+ final PropertyEncryptor encryptor = controller.getEncryptor();
+
// get the process group
final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
final FlowManager flowManager = controller.getFlowManager();
@@ -1071,7 +1072,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
// update nested process groups (recursively)
final List nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
- updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
+ updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encodingVersion);
}
// update connections
@@ -1295,11 +1296,12 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
}
private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
- final PropertyEncryptor encryptor, final FlowEncodingVersion encodingVersion) {
+ final FlowEncodingVersion encodingVersion) {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
final FlowManager flowManager = controller.getFlowManager();
+ final PropertyEncryptor encryptor = controller.getEncryptor();
// add the process group
final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
@@ -1358,7 +1360,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addNestedProcessGroups(final Element processGroupElement, final ProcessGroup processGroup, final FlowController flowController, final FlowEncodingVersion encodingVersion) {
final List nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
- addProcessGroup(flowController, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
+ addProcessGroup(flowController, processGroup, nestedProcessGroupElement, encodingVersion);
}
}
@@ -1397,6 +1399,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addControllerServices(final Element processGroupElement, final ProcessGroup processGroup, final FlowController flowController, final FlowEncodingVersion encodingVersion) {
final List serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
+ final PropertyEncryptor encryptor = flowController.getEncryptor();
if (!serviceNodeList.isEmpty()) {
final Map controllerServices = ControllerServiceLoader.loadControllerServices(serviceNodeList, flowController, processGroup, encryptor, encodingVersion);
ControllerServiceLoader.enableControllerServices(controllerServices, flowController, encryptor, autoResumeState, encodingVersion);
@@ -1405,6 +1408,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addProcessors(final Element processGroupElement, final ProcessGroup processGroup, final FlowController flowController, final FlowEncodingVersion encodingVersion) {
final List processorNodeList = getChildrenByTagName(processGroupElement, "processor");
+ final PropertyEncryptor encryptor = flowController.getEncryptor();
for (final Element processorElement : processorNodeList) {
final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor, encodingVersion);
@@ -1557,6 +1561,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addRemoteProcessGroups(final Element processGroupElement, final ProcessGroup processGroup, final FlowController controller) {
final List remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup");
+ final PropertyEncryptor encryptor = controller.getEncryptor();
for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) {
final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
final RemoteProcessGroup remoteGroup = controller.getFlowManager().createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris());
@@ -1740,7 +1745,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private byte[] toBytes(final FlowController flowController) throws FlowSerializationException {
final ByteArrayOutputStream result = new ByteArrayOutputStream();
- final StandardFlowSerializer flowSerializer = new StandardFlowSerializer(encryptor);
+ final StandardFlowSerializer flowSerializer = new StandardFlowSerializer();
flowController.serialize(flowSerializer, result);
return result.toByteArray();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java
index cca4f999d3..2f3659a8ae 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java
@@ -19,7 +19,6 @@ package org.apache.nifi.controller.scheduling;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.tasks.ConnectableTask;
-import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
@@ -38,26 +37,23 @@ public abstract class AbstractTimeBasedSchedulingAgent extends AbstractSchedulin
protected final FlowController flowController;
protected final RepositoryContextFactory contextFactory;
- protected final PropertyEncryptor encryptor;
protected volatile String adminYieldDuration = "1 sec";
public AbstractTimeBasedSchedulingAgent(
- final FlowEngine flowEngine,
- final FlowController flowController,
- final RepositoryContextFactory contextFactory,
- final PropertyEncryptor encryptor
+ final FlowEngine flowEngine,
+ final FlowController flowController,
+ final RepositoryContextFactory contextFactory
) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
- this.encryptor = encryptor;
}
@Override
public void doScheduleOnce(final Connectable connectable, final LifecycleState scheduleState, Callable> stopCallback) {
final List> futures = new ArrayList<>();
- final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor);
+ final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState);
final Runnable trigger = () -> {
connectableTask.invoke();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 89ec74ec61..71c590d6b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -21,7 +21,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
-import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.quartz.CronExpression;
@@ -37,8 +36,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
private final Map