NIFI-10104 Refactored usage of PropertyEncryptor

This closes #6119

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2022-06-10 10:33:13 -07:00 committed by exceptionfactory
parent a30ac23e90
commit 2161d0fe9c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
28 changed files with 153 additions and 188 deletions

View File

@ -174,7 +174,7 @@ public class Node {
flowController.getStateManagerProvider().getStateManager("Cluster Node Configuration").setState(Collections.singletonMap("Node UUID", nodeId.getId()), Scope.LOCAL);
flowService = StandardFlowService.createClusteredInstance(flowController, nodeProperties, senderListener, clusterCoordinator,
PropertyEncryptorFactory.getPropertyEncryptor(nodeProperties), revisionManager, Mockito.mock(Authorizer.class));
revisionManager, Mockito.mock(Authorizer.class));
flowService.start();

View File

@ -544,7 +544,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
throw new RuntimeException(e);
}
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties);
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
parameterContextManager = new StandardParameterContextManager();
@ -559,8 +559,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor, this.nifiProperties);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
@ -1496,7 +1496,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
* Synchronizes this controller with the proposed flow.
* <p>
* For more details, see
* {@link FlowSynchronizer#sync(FlowController, DataFlow, PropertyEncryptor, FlowService, BundleUpdateStrategy)}.
* {@link FlowSynchronizer#sync(FlowController, DataFlow, FlowService, BundleUpdateStrategy)}.
*
* @param synchronizer synchronizer
* @param dataFlow the flow to load the controller with. If the flow is null
@ -1521,7 +1521,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
LOG.debug("Synchronizing controller with proposed flow");
try {
synchronizer.sync(this, dataFlow, encryptor, flowService, bundleUpdateStrategy);
synchronizer.sync(this, dataFlow, flowService, bundleUpdateStrategy);
} catch (final UninheritableFlowException ufe) {
final NodeIdentifier localNodeId = getNodeId();
if (localNodeId != null) {

View File

@ -54,7 +54,6 @@ import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.StandardFlowSynchronizer;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.BundleUpdateStrategy;
@ -157,12 +156,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
public static StandardFlowService createStandaloneInstance(
final FlowController controller,
final NiFiProperties nifiProperties,
final PropertyEncryptor encryptor,
final RevisionManager revisionManager,
final Authorizer authorizer,
final FlowSerializationStrategy serializationStrategy) throws IOException {
return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer,
return new StandardFlowService(controller, nifiProperties, null, false, null, revisionManager, authorizer,
serializationStrategy);
}
@ -171,11 +169,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final NiFiProperties nifiProperties,
final NodeProtocolSenderListener senderListener,
final ClusterCoordinator coordinator,
final PropertyEncryptor encryptor,
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer,
return new StandardFlowService(controller, nifiProperties, senderListener, true, coordinator, revisionManager, authorizer,
FlowSerializationStrategy.WRITE_XML_AND_JSON);
}
@ -183,7 +180,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final FlowController controller,
final NiFiProperties nifiProperties,
final NodeProtocolSenderListener senderListener,
final PropertyEncryptor encryptor,
final boolean configuredForClustering,
final ClusterCoordinator clusterCoordinator,
final RevisionManager revisionManager,
@ -193,10 +189,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.nifiProperties = nifiProperties;
this.controller = controller;
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
autoResumeState = nifiProperties.getAutoResumeState();
dao = new StandardFlowConfigurationDAO(encryptor, nifiProperties, controller.getExtensionManager(), serializationStrategy);
dao = new StandardFlowConfigurationDAO(nifiProperties, controller.getExtensionManager(), serializationStrategy);
this.clusterCoordinator = clusterCoordinator;
if (clusterCoordinator != null) {
clusterCoordinator.setFlowService(this);
@ -236,7 +233,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.configuredForClustering = false;
this.senderListener = null;
}
}
@Override

View File

@ -141,21 +141,18 @@ import java.util.zip.GZIPInputStream;
public class XmlFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger = LoggerFactory.getLogger(XmlFlowSynchronizer.class);
private final PropertyEncryptor encryptor;
private final boolean autoResumeState;
private final NiFiProperties nifiProperties;
private final ExtensionManager extensionManager;
public XmlFlowSynchronizer(final PropertyEncryptor encryptor, final NiFiProperties nifiProperties, final ExtensionManager extensionManager) {
this.encryptor = encryptor;
public XmlFlowSynchronizer(final NiFiProperties nifiProperties, final ExtensionManager extensionManager) {
this.autoResumeState = nifiProperties.getAutoResumeState();
this.nifiProperties = nifiProperties;
this.extensionManager = extensionManager;
}
@Override
public void sync(final FlowController controller, final DataFlow proposedFlow, final PropertyEncryptor encryptor, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
public void sync(final FlowController controller, final DataFlow proposedFlow, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
final FlowManager flowManager = controller.getFlowManager();
@ -378,6 +375,8 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
final boolean flowAlreadySynchronized = controller.isFlowSynchronized();
final FlowManager flowManager = controller.getFlowManager();
final PropertyEncryptor encryptor = controller.getEncryptor();
// get the root element
final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement);
@ -424,10 +423,10 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
});
logger.trace("Adding root process group");
rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encodingVersion);
} else {
logger.trace("Updating root process group");
rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encodingVersion);
}
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
@ -825,11 +824,13 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
}
private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
final PropertyEncryptor encryptor, final FlowEncodingVersion encodingVersion) {
final FlowEncodingVersion encodingVersion) {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
final PropertyEncryptor encryptor = controller.getEncryptor();
// get the process group
final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
final FlowManager flowManager = controller.getFlowManager();
@ -1071,7 +1072,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
// update nested process groups (recursively)
final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encodingVersion);
}
// update connections
@ -1295,11 +1296,12 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
}
private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
final PropertyEncryptor encryptor, final FlowEncodingVersion encodingVersion) {
final FlowEncodingVersion encodingVersion) {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
final FlowManager flowManager = controller.getFlowManager();
final PropertyEncryptor encryptor = controller.getEncryptor();
// add the process group
final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
@ -1358,7 +1360,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addNestedProcessGroups(final Element processGroupElement, final ProcessGroup processGroup, final FlowController flowController, final FlowEncodingVersion encodingVersion) {
final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
addProcessGroup(flowController, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
addProcessGroup(flowController, processGroup, nestedProcessGroupElement, encodingVersion);
}
}
@ -1397,6 +1399,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addControllerServices(final Element processGroupElement, final ProcessGroup processGroup, final FlowController flowController, final FlowEncodingVersion encodingVersion) {
final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
final PropertyEncryptor encryptor = flowController.getEncryptor();
if (!serviceNodeList.isEmpty()) {
final Map<ControllerServiceNode, Element> controllerServices = ControllerServiceLoader.loadControllerServices(serviceNodeList, flowController, processGroup, encryptor, encodingVersion);
ControllerServiceLoader.enableControllerServices(controllerServices, flowController, encryptor, autoResumeState, encodingVersion);
@ -1405,6 +1408,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addProcessors(final Element processGroupElement, final ProcessGroup processGroup, final FlowController flowController, final FlowEncodingVersion encodingVersion) {
final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor");
final PropertyEncryptor encryptor = flowController.getEncryptor();
for (final Element processorElement : processorNodeList) {
final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor, encodingVersion);
@ -1557,6 +1561,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private void addRemoteProcessGroups(final Element processGroupElement, final ProcessGroup processGroup, final FlowController controller) {
final List<Element> remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup");
final PropertyEncryptor encryptor = controller.getEncryptor();
for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) {
final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
final RemoteProcessGroup remoteGroup = controller.getFlowManager().createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris());
@ -1740,7 +1745,7 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
private byte[] toBytes(final FlowController flowController) throws FlowSerializationException {
final ByteArrayOutputStream result = new ByteArrayOutputStream();
final StandardFlowSerializer flowSerializer = new StandardFlowSerializer(encryptor);
final StandardFlowSerializer flowSerializer = new StandardFlowSerializer();
flowController.serialize(flowSerializer, result);
return result.toByteArray();
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.controller.scheduling;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
@ -38,26 +37,23 @@ public abstract class AbstractTimeBasedSchedulingAgent extends AbstractSchedulin
protected final FlowController flowController;
protected final RepositoryContextFactory contextFactory;
protected final PropertyEncryptor encryptor;
protected volatile String adminYieldDuration = "1 sec";
public AbstractTimeBasedSchedulingAgent(
final FlowEngine flowEngine,
final FlowController flowController,
final RepositoryContextFactory contextFactory,
final PropertyEncryptor encryptor
final RepositoryContextFactory contextFactory
) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.encryptor = encryptor;
}
@Override
public void doScheduleOnce(final Connectable connectable, final LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
final List<ScheduledFuture<?>> futures = new ArrayList<>();
final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor);
final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState);
final Runnable trigger = () -> {
connectableTask.invoke();

View File

@ -21,7 +21,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.quartz.CronExpression;
@ -37,8 +36,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>();
public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory, final PropertyEncryptor encryptor) {
super(flowEngine, flowController, contextFactory, encryptor);
public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory) {
super(flowEngine, flowController, contextFactory);
}
@Override
@ -116,7 +115,7 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor);
final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState);
final AtomicBoolean canceled = new AtomicBoolean(false);

View File

@ -40,7 +40,6 @@ import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContex
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
@ -91,13 +90,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ScheduledExecutorService componentLifeCycleThreadPool;
private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true);
private final PropertyEncryptor encryptor;
public StandardProcessScheduler(final FlowEngine componentLifecycleThreadPool, final FlowController flowController, final PropertyEncryptor encryptor,
public StandardProcessScheduler(final FlowEngine componentLifecycleThreadPool, final FlowController flowController,
final StateManagerProvider stateManagerProvider, final NiFiProperties nifiProperties) {
this.componentLifeCycleThreadPool = componentLifecycleThreadPool;
this.flowController = flowController;
this.encryptor = encryptor;
this.stateManagerProvider = stateManagerProvider;
administrativeYieldDuration = nifiProperties.getAdministrativeYieldDuration();
@ -337,7 +333,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
flowController.getEncryptor(), getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
final CompletableFuture<Void> future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@ -377,7 +373,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
flowController.getEncryptor(), getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
final CompletableFuture<Void> future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@ -418,7 +414,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final LifecycleState lifecycleState = getLifecycleState(procNode, false);
StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
flowController.getEncryptor(), getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
LOG.info("Stopping {}", procNode);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), lifecycleState);
@ -567,7 +563,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
getSchedulingAgent(connectable).unschedule(connectable, state);
if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, flowController.getEncryptor(), getStateManager(connectable.getIdentifier()));
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getClass(), connectable.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
}

View File

@ -22,7 +22,6 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.InvocationResult;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@ -37,8 +36,8 @@ public class TimerDrivenSchedulingAgent extends AbstractTimeBasedSchedulingAgent
private final long noWorkYieldNanos;
public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory,
final PropertyEncryptor encryptor, final NiFiProperties nifiProperties) {
super(flowEngine, flowController, contextFactory, encryptor);
final NiFiProperties nifiProperties) {
super(flowEngine, flowController, contextFactory);
final String boredYieldDuration = nifiProperties.getBoredYieldDuration();
try {
@ -69,7 +68,7 @@ public class TimerDrivenSchedulingAgent extends AbstractTimeBasedSchedulingAgent
@Override
public void doSchedule(final Connectable connectable, final LifecycleState scheduleState) {
final List<ScheduledFuture<?>> futures = new ArrayList<>();
final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState, encryptor);
final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState);
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
// Determine the task to run and create it.

View File

@ -20,7 +20,6 @@ import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.MissingBundleException;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.services.FlowService;
@ -36,7 +35,6 @@ public interface FlowSynchronizer {
*
* @param controller the flow controller
* @param dataFlow the flow to load the controller with. If the flow is null or zero length, then the controller must not have a flow or else an UninheritableFlowException will be thrown.
* @param encryptor used for the encryption/decryption of sensitive property values
* @param flowService the flow service
*
* @throws FlowSerializationException if proposed flow is not a valid flow configuration file
@ -44,7 +42,7 @@ public interface FlowSynchronizer {
* @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used
* @throws MissingBundleException if the proposed flow cannot be loaded by the controller because it contains a bundle that is not available to the controller
*/
void sync(FlowController controller, DataFlow dataFlow, PropertyEncryptor encryptor, FlowService flowService, BundleUpdateStrategy bundleUpdateStrategy)
void sync(FlowController controller, DataFlow dataFlow, FlowService flowService, BundleUpdateStrategy bundleUpdateStrategy)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException, MissingBundleException;
}

View File

@ -77,15 +77,13 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
private static final String MAX_ENCODING_VERSION = "1.4";
private final PropertyEncryptor encryptor;
public StandardFlowSerializer(final PropertyEncryptor encryptor) {
this.encryptor = encryptor;
public StandardFlowSerializer() {
}
@Override
public Document transform(final FlowController controller, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException {
final PropertyEncryptor encryptor = controller.getEncryptor();
try {
// create a new, empty document
final StandardDocumentProvider documentProvider = new StandardDocumentProvider();
@ -105,15 +103,15 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
final Element parameterContextsElement = doc.createElement("parameterContexts");
rootNode.appendChild(parameterContextsElement);
addParameterContexts(parameterContextsElement, controller.getFlowManager().getParameterContextManager());
addParameterContexts(parameterContextsElement, controller.getFlowManager().getParameterContextManager(), encryptor);
addProcessGroup(rootNode, controller.getFlowManager().getRootGroup(), "rootGroup", scheduledStateLookup);
addProcessGroup(rootNode, controller.getFlowManager().getRootGroup(), "rootGroup", scheduledStateLookup, encryptor);
// Add root-level controller services
final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode);
for (final ControllerServiceNode serviceNode : controller.getFlowManager().getRootControllerServices()) {
addControllerService(controllerServicesNode, serviceNode);
addControllerService(controllerServicesNode, serviceNode, encryptor);
}
final Element reportingTasksNode = doc.createElement("reportingTasks");
@ -146,7 +144,8 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
}
}
private void addParameterContexts(final Element parentElement, final ParameterContextManager parameterContextManager) {
private void addParameterContexts(final Element parentElement, final ParameterContextManager parameterContextManager,
final PropertyEncryptor encryptor) {
for (final ParameterContext parameterContext : parameterContextManager.getParameterContexts()) {
final Element parameterContextElement = parentElement.getOwnerDocument().createElement("parameterContext");
parentElement.appendChild(parameterContextElement);
@ -160,12 +159,12 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
}
for (final Parameter parameter : parameterContext.getParameters().values()) {
addParameter(parameterContextElement, parameter);
addParameter(parameterContextElement, parameter, encryptor);
}
}
}
private void addParameter(final Element parentElement, final Parameter parameter) {
private void addParameter(final Element parentElement, final Parameter parameter, final PropertyEncryptor encryptor) {
final Element parameterElement = parentElement.getOwnerDocument().createElement("parameter");
parentElement.appendChild(parameterElement);
@ -222,7 +221,8 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
parentElement.appendChild(element);
}
private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName, final ScheduledStateLookup scheduledStateLookup,
final PropertyEncryptor encryptor) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
@ -251,7 +251,7 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
}
for (final ProcessorNode processor : group.getProcessors()) {
addProcessor(element, processor, scheduledStateLookup);
addProcessor(element, processor, scheduledStateLookup, encryptor);
}
for (final Port port : group.getInputPorts()) {
@ -279,11 +279,11 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
addProcessGroup(element, childGroup, "processGroup", scheduledStateLookup);
addProcessGroup(element, childGroup, "processGroup", scheduledStateLookup, encryptor);
}
for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
addRemoteProcessGroup(element, remoteRef, scheduledStateLookup);
addRemoteProcessGroup(element, remoteRef, scheduledStateLookup, encryptor);
}
for (final Connection connection : group.getConnections()) {
@ -291,7 +291,7 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
}
for (final ControllerServiceNode service : group.getControllerServices(false)) {
addControllerService(element, service);
addControllerService(element, service, encryptor);
}
for (final Template template : group.getTemplates()) {
@ -376,7 +376,8 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addPosition(element, funnel.getPosition());
}
private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef, final ScheduledStateLookup scheduledStateLookup) {
private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef, final ScheduledStateLookup scheduledStateLookup,
final PropertyEncryptor encryptor) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("remoteProcessGroup");
parentElement.appendChild(element);
@ -484,7 +485,8 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
parentElement.appendChild(element);
}
private void addProcessor(final Element parentElement, final ProcessorNode processor, final ScheduledStateLookup scheduledStateLookup) {
private void addProcessor(final Element parentElement, final ProcessorNode processor, final ScheduledStateLookup scheduledStateLookup,
final PropertyEncryptor encryptor) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("processor");
parentElement.appendChild(element);
@ -616,7 +618,7 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
parentElement.appendChild(element);
}
public void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
public void addControllerService(final Element element, final ControllerServiceNode serviceNode, final PropertyEncryptor encryptor) {
final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
addTextElement(serviceElement, "id", serviceNode.getIdentifier());
addTextElement(serviceElement, "versionedComponentId", serviceNode.getVersionedComponentId());

View File

@ -22,7 +22,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.MissingBundleException;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.XmlFlowSynchronizer;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.services.FlowService;
@ -36,11 +35,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
@Override
public void sync(final FlowController controller, final DataFlow dataFlow, final PropertyEncryptor encryptor, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
public void sync(final FlowController controller, final DataFlow dataFlow, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException, MissingBundleException {
final FlowSynchronizer synchronizer = isXml(dataFlow) ? xmlFlowSynchronizer : versionedFlowSynchronizer;
synchronizer.sync(controller, dataFlow, encryptor, flowService, bundleUpdateStrategy);
synchronizer.sync(controller, dataFlow, flowService, bundleUpdateStrategy);
}
public static boolean isFlowEmpty(final DataFlow dataFlow) {

View File

@ -33,7 +33,6 @@ import java.io.OutputStream;
public class VersionedFlowSerializer implements FlowSerializer<VersionedDataflow> {
private static final ObjectMapper JSON_CODEC = new ObjectMapper();
private final PropertyEncryptor propertyEncryptor;
private final ExtensionManager extensionManager;
static {
@ -43,14 +42,14 @@ public class VersionedFlowSerializer implements FlowSerializer<VersionedDataflow
JSON_CODEC.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public VersionedFlowSerializer(final PropertyEncryptor propertyEncryptor, final ExtensionManager extensionManager) {
this.propertyEncryptor = propertyEncryptor;
public VersionedFlowSerializer(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
}
@Override
public VersionedDataflow transform(final FlowController controller, final ScheduledStateLookup stateLookup) throws FlowSerializationException {
final VersionedDataflowMapper dataflowMapper = new VersionedDataflowMapper(controller, extensionManager, propertyEncryptor::encrypt, stateLookup);
final PropertyEncryptor encryptor = controller.getEncryptor();
final VersionedDataflowMapper dataflowMapper = new VersionedDataflowMapper(controller, extensionManager, encryptor::encrypt, stateLookup);
final VersionedDataflow dataflow = dataflowMapper.createMapping();
return dataflow;
}

View File

@ -121,23 +121,18 @@ import java.util.zip.GZIPInputStream;
public class VersionedFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger = LoggerFactory.getLogger(VersionedFlowSynchronizer.class);
private static final String ENCRYPTED_VALUE_PREFIX = "enc{";
private static final String ENCRYPTED_VALUE_SUFFIX = "}";
private final PropertyEncryptor encryptor;
private final ExtensionManager extensionManager;
private final File flowStorageFile;
private final FlowConfigurationArchiveManager archiveManager;
public VersionedFlowSynchronizer(final PropertyEncryptor encryptor, final ExtensionManager extensionManager, final File flowStorageFile, final FlowConfigurationArchiveManager archiveManager) {
this.encryptor = encryptor;
public VersionedFlowSynchronizer(final ExtensionManager extensionManager, final File flowStorageFile, final FlowConfigurationArchiveManager archiveManager) {
this.extensionManager = extensionManager;
this.flowStorageFile = flowStorageFile;
this.archiveManager = archiveManager;
}
public synchronized void sync(final FlowController controller, final DataFlow proposedFlow, final PropertyEncryptor encryptor, final FlowService flowService,
public synchronized void sync(final FlowController controller, final DataFlow proposedFlow, final FlowService flowService,
final BundleUpdateStrategy bundleUpdateStrategy)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException, MissingBundleException {
@ -168,7 +163,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final DataFlow existingDataFlow = getExistingDataFlow(controller);
checkFlowInheritability(existingDataFlow, proposedFlow, controller, bundleUpdateStrategy);
final FlowComparison flowComparison = compareFlows(existingDataFlow, proposedFlow, encryptor);
final FlowComparison flowComparison = compareFlows(existingDataFlow, proposedFlow, controller.getEncryptor());
final Set<FlowDifference> flowDifferences = flowComparison.getDifferences();
if (flowDifferences.isEmpty()) {
logger.debug("No differences between current flow and proposed flow. Will not create backup of existing flow.");
@ -298,6 +293,8 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
try {
final VersionedDataflow versionedFlow = proposedFlow.getVersionedDataflow();
final PropertyEncryptor encryptor = controller.getEncryptor();
if (versionedFlow != null) {
controller.setMaxTimerDrivenThreadCount(versionedFlow.getMaxTimerDrivenThreadCount());
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
@ -504,7 +501,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
taskNode.setAnnotationData(reportingTask.getAnnotationData());
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(taskNode, reportingTask);
final Map<String, String> decryptedProperties = decryptProperties(reportingTask.getProperties());
final Map<String, String> decryptedProperties = decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
taskNode.setProperties(decryptedProperties, false, sensitiveDynamicPropertyNames);
// enable/disable/start according to the ScheduledState
@ -543,7 +540,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
Collectors.toMap(VersionedParameterContext::getName, Function.identity())
);
for (final VersionedParameterContext versionedParameterContext : parameterContexts) {
inheritParameterContext(versionedParameterContext, controller.getFlowManager(), namedParameterContexts);
inheritParameterContext(versionedParameterContext, controller.getFlowManager(), namedParameterContexts, controller.getEncryptor());
}
});
}
@ -551,23 +548,25 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
private void inheritParameterContext(
final VersionedParameterContext versionedParameterContext,
final FlowManager flowManager,
final Map<String, VersionedParameterContext> namedParameterContexts
final Map<String, VersionedParameterContext> namedParameterContexts,
final PropertyEncryptor encryptor
) {
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
final ParameterContext existingContext = contextManager.getParameterContextNameMapping().get(versionedParameterContext.getName());
if (existingContext == null) {
addParameterContext(versionedParameterContext, flowManager, namedParameterContexts);
addParameterContext(versionedParameterContext, flowManager, namedParameterContexts, encryptor);
} else {
updateParameterContext(versionedParameterContext, existingContext, flowManager, namedParameterContexts);
updateParameterContext(versionedParameterContext, existingContext, flowManager, namedParameterContexts, encryptor);
}
}
private void addParameterContext(
final VersionedParameterContext versionedParameterContext,
final FlowManager flowManager,
final Map<String, VersionedParameterContext> namedParameterContexts
final Map<String, VersionedParameterContext> namedParameterContexts,
final PropertyEncryptor encryptor
) {
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext, encryptor);
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
@ -606,7 +605,8 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
return referenceIds;
}
private Map<String, Parameter> createParameterMap(final VersionedParameterContext versionedParameterContext) {
private Map<String, Parameter> createParameterMap(final VersionedParameterContext versionedParameterContext,
final PropertyEncryptor encryptor) {
final Map<String, Parameter> parameters = new HashMap<>();
for (final VersionedParameter versioned : versionedParameterContext.getParameters()) {
final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
@ -619,9 +619,8 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final String rawValue = versioned.getValue();
if (rawValue == null) {
parameterValue = null;
} else if (versioned.isSensitive() && rawValue.startsWith(ENCRYPTED_VALUE_PREFIX) && rawValue.endsWith(ENCRYPTED_VALUE_SUFFIX)) {
final String extractedValue = rawValue.substring(ENCRYPTED_VALUE_PREFIX.length(), rawValue.length() - ENCRYPTED_VALUE_SUFFIX.length());
parameterValue = encryptor.decrypt(extractedValue);
} else if (versioned.isSensitive()) {
parameterValue = decrypt(rawValue, encryptor);
} else {
parameterValue = rawValue;
}
@ -637,9 +636,10 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final VersionedParameterContext versionedParameterContext,
final ParameterContext parameterContext,
final FlowManager flowManager,
final Map<String, VersionedParameterContext> namedParameterContexts
final Map<String, VersionedParameterContext> namedParameterContexts,
final PropertyEncryptor encryptor
) {
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext, encryptor);
final Map<String, String> currentValues = new HashMap<>();
parameterContext.getParameters().values().forEach(param -> currentValues.put(param.getDescriptor().getName(), param.getValue()));
@ -711,7 +711,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
for (final VersionedControllerService versionedControllerService : controllerServices) {
final ControllerServiceNode serviceNode = flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier());
if (controllerServicesAdded.contains(serviceNode) || affectedComponentSet.isControllerServiceAffected(serviceNode.getIdentifier())) {
updateRootControllerService(serviceNode, versionedControllerService);
updateRootControllerService(serviceNode, versionedControllerService, controller.getEncryptor());
}
}
@ -745,7 +745,8 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
return serviceNode;
}
private void updateRootControllerService(final ControllerServiceNode serviceNode, final VersionedControllerService versionedControllerService) {
private void updateRootControllerService(final ControllerServiceNode serviceNode, final VersionedControllerService versionedControllerService,
final PropertyEncryptor encryptor) {
serviceNode.pauseValidationTrigger();
try {
serviceNode.setName(versionedControllerService.getName());
@ -753,7 +754,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
serviceNode.setComments(versionedControllerService.getComments());
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(serviceNode, versionedControllerService);
final Map<String, String> decryptedProperties = decryptProperties(versionedControllerService.getProperties());
final Map<String, String> decryptedProperties = decryptProperties(versionedControllerService.getProperties(), encryptor);
serviceNode.setProperties(decryptedProperties, false, sensitiveDynamicPropertyNames);
} finally {
serviceNode.resumeValidationTrigger();
@ -787,13 +788,13 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
.collect(Collectors.toSet());
}
private Map<String, String> decryptProperties(final Map<String, String> encrypted) {
private Map<String, String> decryptProperties(final Map<String, String> encrypted, final PropertyEncryptor encryptor) {
final Map<String, String> decrypted = new HashMap<>(encrypted.size());
encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value)));
encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value, encryptor)));
return decrypted;
}
private String decrypt(final String value) {
private String decrypt(final String value, final PropertyEncryptor encryptor) {
if (isValueSensitive(value)) {
try {
return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length()));
@ -1003,7 +1004,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
private byte[] toBytes(final FlowController flowController) throws FlowSerializationException {
final ByteArrayOutputStream result = new ByteArrayOutputStream();
final FlowSerializer<VersionedDataflow> flowSerializer = new VersionedFlowSerializer(encryptor, extensionManager);
final FlowSerializer<VersionedDataflow> flowSerializer = new VersionedFlowSerializer(extensionManager);
flowController.serialize(flowSerializer, result);
return result.toByteArray();
}

View File

@ -69,10 +69,8 @@ public class ConnectableTask {
private final FlowController flowController;
private final int numRelationships;
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
final FlowController flowController, final RepositoryContextFactory contextFactory, final LifecycleState scheduleState,
final PropertyEncryptor encryptor) {
final FlowController flowController, final RepositoryContextFactory contextFactory, final LifecycleState scheduleState) {
this.schedulingAgent = schedulingAgent;
this.connectable = connectable;
@ -80,6 +78,8 @@ public class ConnectableTask {
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
final PropertyEncryptor encryptor = flowController.getEncryptor();
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
processContext = new StandardProcessContext(

View File

@ -30,7 +30,6 @@ import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.controller.serialization.StandardFlowSynchronizer;
import org.apache.nifi.controller.serialization.VersionedFlowSerializer;
import org.apache.nifi.controller.serialization.VersionedFlowSynchronizer;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.services.FlowService;
@ -56,7 +55,6 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
private final File xmlFile;
private final File jsonFile;
private final PropertyEncryptor encryptor;
private final FlowConfigurationArchiveManager archiveManager;
private final NiFiProperties nifiProperties;
private final ExtensionManager extensionManager;
@ -65,8 +63,8 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
private final String clusterFlowSerializationFormat;
private final FlowSerializationStrategy serializationStrategy;
public StandardFlowConfigurationDAO(final PropertyEncryptor encryptor, final NiFiProperties nifiProperties,
final ExtensionManager extensionManager, final FlowSerializationStrategy serializationStrategy) throws IOException {
public StandardFlowConfigurationDAO(final NiFiProperties nifiProperties, final ExtensionManager extensionManager,
final FlowSerializationStrategy serializationStrategy) throws IOException {
this.nifiProperties = nifiProperties;
this.clusterFlowSerializationFormat = nifiProperties.getProperty(CLUSTER_FLOW_SERIALIZATION_FORMAT);
this.serializationStrategy = serializationStrategy;
@ -85,7 +83,6 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
throw new IOException(jsonFile + " exists but you have insufficient read/write privileges");
}
this.encryptor = encryptor;
this.extensionManager = extensionManager;
this.archiveManager = new FlowConfigurationArchiveManager(nifiProperties);
@ -100,8 +97,8 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
public synchronized void load(final FlowController controller, final DataFlow dataFlow, final FlowService flowService, final BundleUpdateStrategy bundleUpdateStrategy)
throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
final VersionedFlowSynchronizer versionedFlowSynchronizer = new VersionedFlowSynchronizer(encryptor, extensionManager, nifiProperties.getFlowConfigurationJsonFile(), archiveManager);
final XmlFlowSynchronizer xmlFlowSynchronizer = new XmlFlowSynchronizer(encryptor, nifiProperties, extensionManager);
final VersionedFlowSynchronizer versionedFlowSynchronizer = new VersionedFlowSynchronizer(extensionManager, nifiProperties.getFlowConfigurationJsonFile(), archiveManager);
final XmlFlowSynchronizer xmlFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(xmlFlowSynchronizer, versionedFlowSynchronizer);
controller.synchronize(standardFlowSynchronizer, dataFlow, flowService, bundleUpdateStrategy);
@ -170,9 +167,9 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
// Serialize based on the serialization format configured for cluster communications. If not configured, use JSON.
final FlowSerializer<?> serializer;
if (FLOW_SERIALIZATION_FORMAT_XML.equalsIgnoreCase(clusterFlowSerializationFormat)) {
serializer = new StandardFlowSerializer(encryptor);
serializer = new StandardFlowSerializer();
} else {
serializer = new VersionedFlowSerializer(encryptor, extensionManager);
serializer = new VersionedFlowSerializer(extensionManager);
}
controller.serialize(serializer, os);
@ -196,13 +193,13 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
}
private void saveJson(final FlowController controller, final boolean archive) throws IOException {
final FlowSerializer<?> serializer = new VersionedFlowSerializer(controller.getEncryptor(), controller.getExtensionManager());
final FlowSerializer<?> serializer = new VersionedFlowSerializer(controller.getExtensionManager());
saveFlow(controller, serializer, jsonFile, archive);
jsonFileExists = true;
}
private void saveXml(final FlowController controller, final boolean archive) throws IOException {
final FlowSerializer<?> serializer = new StandardFlowSerializer(controller.getEncryptor());
final FlowSerializer<?> serializer = new StandardFlowSerializer();
saveFlow(controller, serializer, xmlFile, archive);
}

View File

@ -57,14 +57,12 @@ public class StandardFlowServiceFactoryBean implements FactoryBean, ApplicationC
properties,
nodeProtocolSenderListener,
clusterCoordinator,
encryptor,
revisionManager,
authorizer);
} else {
flowService = StandardFlowService.createStandaloneInstance(
flowController,
properties,
encryptor,
revisionManager,
authorizer,
FlowSerializationStrategy.WRITE_XML_AND_JSON);

View File

@ -94,7 +94,7 @@ public class StandardFlowServiceTest {
extensionManager = mock(ExtensionDiscoveringManager.class);
flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor,
new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer,
flowService = StandardFlowService.createStandaloneInstance(flowController, properties, revisionManager, authorizer,
FlowSerializationStrategy.WRITE_XML_AND_JSON);
statusHistoryRepository = mock(StatusHistoryRepository.class);
}
@ -104,7 +104,7 @@ public class StandardFlowServiceTest {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>()));
StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
StandardFlowSerializer serializer = new StandardFlowSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP);
serializer.serialize(doc, baos);
@ -129,7 +129,7 @@ public class StandardFlowServiceTest {
flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-inheritable.xml"));
flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>()));
StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
StandardFlowSerializer serializer = new StandardFlowSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP);
serializer.serialize(doc, baos);
@ -149,8 +149,7 @@ public class StandardFlowServiceTest {
flowService.load(new StandardDataFlow(updatedBytes, null, null, new HashSet<>()));
fail("should have thrown " + UninheritableFlowException.class);
} catch (UninheritableFlowException ufe) {
StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
StandardFlowSerializer serializer = new StandardFlowSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP);
serializer.serialize(doc, baos);
@ -172,8 +171,7 @@ public class StandardFlowServiceTest {
flowService.load(new StandardDataFlow(updatedBytes, null, null, new HashSet<>()));
fail("should have thrown " + FlowSerializationException.class);
} catch (FlowSerializationException ufe) {
StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
StandardFlowSerializer serializer = new StandardFlowSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP);
serializer.serialize(doc, baos);

View File

@ -292,7 +292,7 @@ public class StandardProcessorNodeIT {
final StateManagerProvider stateManagerProvider = mock(StateManagerProvider.class);
final ProcessScheduler processScheduler = new StandardProcessScheduler(null, flowController, null,
final ProcessScheduler processScheduler = new StandardProcessScheduler(null, flowController,
stateManagerProvider, nifiProperties);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, narBundle.getBundleDetails().getCoordinate(), componentLog);

View File

@ -198,8 +198,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
@ -260,8 +259,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
@ -302,8 +300,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
@ -317,8 +314,7 @@ public class TestFlowController {
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenAuthorizationsAreDifferent() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -339,8 +335,7 @@ public class TestFlowController {
@Test(expected = FlowSynchronizationException.class)
public void testSynchronizeFlowWithInvalidParameterContextReference() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/parameter-context-flow-error.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -358,8 +353,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithNestedParameterContexts() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/parameter-context-flow.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -383,8 +377,7 @@ public class TestFlowController {
@Test
public void testCreateParameterContextWithAndWithoutValidation() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/parameter-context-flow.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -432,8 +425,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenAuthorizationsAreDifferentAndFlowEmpty() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with different auth fingerprint as the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
@ -450,8 +442,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -476,8 +467,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenProposedAuthorizationsAreNullAndEmptyFlow() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
@ -508,8 +498,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
@ -528,8 +517,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final Set<String> missingComponents = new HashSet<>();
missingComponents.add("1");
@ -549,7 +537,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() throws IOException {
final PropertyEncryptor encryptor = PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(encryptor, nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
when(mockProcessorNode.getIdentifier()).thenReturn("1");
@ -584,7 +572,7 @@ public class TestFlowController {
when(proposedDataFlow.getMissingComponents()).thenReturn(new HashSet<>());
try {
standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, encryptor, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE);
standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE);
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage(), e.getMessage().contains("Current flow has missing components that are not considered missing in the proposed flow (1,2,3)"));
@ -593,8 +581,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenBundlesAreSame() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
logRepository.removeAllObservers();
@ -605,8 +592,7 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenBundlesAreDifferent() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
logRepository.removeAllObservers();

View File

@ -143,7 +143,7 @@ public class TestStandardProcessScheduler {
extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class), null, stateMgrProvider, nifiProperties);
scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class), stateMgrProvider, nifiProperties);
scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
reportingTask = new TestReportingTask();
@ -692,6 +692,6 @@ public class TestStandardProcessScheduler {
}
private StandardProcessScheduler createScheduler() {
return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class), null, stateMgrProvider, nifiProperties);
return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class), stateMgrProvider, nifiProperties);
}
}

View File

@ -103,7 +103,7 @@ public class StandardFlowSerializerTest {
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager, Mockito.mock(StatusHistoryRepository.class));
serializer = new StandardFlowSerializer(encryptor);
serializer = new StandardFlowSerializer();
}
@After

View File

@ -101,7 +101,7 @@ public class StandardControllerServiceProviderIT {
@Test(timeout = 120000)
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException, ExecutionException {
final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class),
null, stateManagerProvider, niFiProperties);
stateManagerProvider, niFiProperties);
for (int i = 0; i < 5000; i++) {
testEnableReferencingServicesGraph(scheduler);

View File

@ -154,7 +154,7 @@ public class TestStandardControllerServiceProvider {
private StandardProcessScheduler createScheduler() {
return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class),
null, stateManagerProvider, niFiProperties);
stateManagerProvider, niFiProperties);
}
private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) {

View File

@ -32,7 +32,6 @@ import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.processor.Processor;
import org.junit.Test;
import org.mockito.Mockito;
@ -61,10 +60,9 @@ public class TestConnectableTask {
Mockito.when(contextFactory.newProcessContext(Mockito.any(Connectable.class), Mockito.any(AtomicLong.class))).thenReturn(repoContext);
final LifecycleState scheduleState = new LifecycleState();
final PropertyEncryptor encryptor = Mockito.mock(PropertyEncryptor.class);
return new ConnectableTask(Mockito.mock(SchedulingAgent.class), connectable,
flowController, contextFactory, scheduleState, encryptor);
flowController, contextFactory, scheduleState);
}
@Test

View File

@ -209,12 +209,12 @@ public class FingerprintFactoryTest {
final DocumentProvider documentProvider = new StandardDocumentProvider();
final Document doc = documentProvider.newDocument();
final FlowSerializer flowSerializer = new StandardFlowSerializer(encryptor);
final FlowSerializer flowSerializer = new StandardFlowSerializer();
final Method serializeMethod = StandardFlowSerializer.class.getDeclaredMethod(serializerMethodName,
Element.class, componentClass, ScheduledStateLookup.class);
Element.class, componentClass, ScheduledStateLookup.class, PropertyEncryptor.class);
serializeMethod.setAccessible(true);
final Element rootElement = doc.createElement("root");
serializeMethod.invoke(flowSerializer, rootElement, component, scheduledStateLookup);
serializeMethod.invoke(flowSerializer, rootElement, component, scheduledStateLookup, encryptor);
return rootElement;
}

View File

@ -285,10 +285,10 @@ public class FrameworkIntegrationTest {
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager, statusHistoryRepository);
}
processScheduler = new StandardProcessScheduler(flowEngine, flowController, encryptor, flowController.getStateManagerProvider(), nifiProperties);
processScheduler = new StandardProcessScheduler(flowEngine, flowController, flowController.getStateManagerProvider(), nifiProperties);
final RepositoryContextFactory repositoryContextFactory = flowController.getRepositoryContextFactory();
final SchedulingAgent timerDrivenSchedulingAgent = new TimerDrivenSchedulingAgent(flowController, flowEngine, repositoryContextFactory, encryptor, nifiProperties);
final SchedulingAgent timerDrivenSchedulingAgent = new TimerDrivenSchedulingAgent(flowController, flowEngine, repositoryContextFactory, nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenSchedulingAgent);
flowFileSwapManager = flowController.createSwapManager();
@ -335,7 +335,7 @@ public class FrameworkIntegrationTest {
logger.info("Shutting down for restart....");
// Save Flow to a byte array
final FlowConfigurationDAO flowDao = new StandardFlowConfigurationDAO(flowController.getEncryptor(), nifiProperties, getExtensionManager(), FlowSerializationStrategy.WRITE_XML_AND_JSON);
final FlowConfigurationDAO flowDao = new StandardFlowConfigurationDAO(nifiProperties, getExtensionManager(), FlowSerializationStrategy.WRITE_XML_AND_JSON);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
flowDao.save(flowController, baos);
final byte[] flowBytes = baos.toByteArray();
@ -354,7 +354,7 @@ public class FrameworkIntegrationTest {
initialize();
// Reload the flow
final FlowSynchronizer flowSynchronizer = new XmlFlowSynchronizer(flowController.getEncryptor(), nifiProperties, extensionManager);
final FlowSynchronizer flowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
flowController.synchronize(flowSynchronizer, new StandardDataFlow(flowBytes, null, null, Collections.emptySet()), Mockito.mock(FlowService.class),
BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);

View File

@ -154,7 +154,6 @@ public class HeadlessNiFiServer implements NiFiServer {
flowService = StandardFlowService.createStandaloneInstance(
flowController,
props,
encryptor,
null, // revision manager
authorizer,
FlowSerializationStrategy.WRITE_XML_ONLY);

View File

@ -47,7 +47,6 @@ import org.apache.nifi.controller.status.ProcessGroupStatus
import org.apache.nifi.controller.status.RunStatus
import org.apache.nifi.diagnostics.StorageUsage
import org.apache.nifi.diagnostics.SystemDiagnostics
import org.apache.nifi.encrypt.PropertyEncryptor
import org.apache.nifi.groups.ProcessGroup
import org.apache.nifi.groups.StandardProcessGroup
import org.apache.nifi.nar.ExtensionManager