mirror of https://github.com/apache/nifi.git
NIFI-11188 Removed ProcessContext encrypt and decrypt methods
- Removed unnecessary references to PropertyEncryptor from multiple framework components Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #6962.
This commit is contained in:
parent
0d4f1523fe
commit
284804ac42
|
@ -95,28 +95,6 @@ public interface ProcessContext extends PropertyContext, ClusterContext {
|
|||
*/
|
||||
Map<PropertyDescriptor, String> getProperties();
|
||||
|
||||
/**
|
||||
* Encrypts the given value using the password provided in the NiFi
|
||||
* Properties
|
||||
*
|
||||
* @deprecated Processors should not depend on framework encryption operations
|
||||
* @param unencrypted plaintext value
|
||||
* @return encrypted value
|
||||
*/
|
||||
@Deprecated
|
||||
String encrypt(String unencrypted);
|
||||
|
||||
/**
|
||||
* Decrypts the given value using the password provided in the NiFi
|
||||
* Properties
|
||||
*
|
||||
* @deprecated Processors should not depend on framework encryption operations
|
||||
* @param encrypted the encrypted value
|
||||
* @return the plaintext value
|
||||
*/
|
||||
@Deprecated
|
||||
String decrypt(String encrypted);
|
||||
|
||||
/**
|
||||
* @return a {@code ControllerServiceLookup} that can be used to obtain a
|
||||
* Controller Service
|
||||
|
|
|
@ -416,19 +416,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String encrypt(final String unencrypted) {
|
||||
return "enc{" + unencrypted + "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String decrypt(final String encrypted) {
|
||||
if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) {
|
||||
return encrypted.substring(4, encrypted.length() - 1);
|
||||
}
|
||||
return encrypted;
|
||||
}
|
||||
|
||||
public void setValidateExpressionUsage(final boolean validate) {
|
||||
allowExpressionValidation = validate;
|
||||
}
|
||||
|
|
|
@ -74,17 +74,6 @@ public class TestMockProcessContext {
|
|||
assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFakeEncryptionAndDecryption() {
|
||||
final DummyProcessor proc = new DummyProcessor();
|
||||
final MockProcessContext context = new MockProcessContext(proc);
|
||||
String subject = "foo";
|
||||
String encrypted = context.encrypt(subject);
|
||||
assertEquals(encrypted, "enc{foo}");
|
||||
String decrypted = context.decrypt(encrypted);
|
||||
assertEquals(decrypted, subject);
|
||||
}
|
||||
|
||||
private static class DummyProcessor extends AbstractProcessor {
|
||||
static final PropertyDescriptor REQUIRED_PROP = new PropertyDescriptor.Builder()
|
||||
.name("required")
|
||||
|
|
|
@ -25,9 +25,6 @@ import org.apache.nifi.connectable.Connectable;
|
|||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||
import org.apache.nifi.deprecation.log.DeprecationLogger;
|
||||
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor;
|
||||
import org.apache.nifi.expression.AttributeValueDecorator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
|
@ -51,16 +48,11 @@ import java.util.concurrent.TimeUnit;
|
|||
public class ConnectableProcessContext implements ProcessContext {
|
||||
|
||||
private final Connectable connectable;
|
||||
private final PropertyEncryptor propertyEncryptor;
|
||||
private final StateManager stateManager;
|
||||
|
||||
private final DeprecationLogger deprecationLogger;
|
||||
|
||||
public ConnectableProcessContext(final Connectable connectable, final PropertyEncryptor propertyEncryptor, final StateManager stateManager) {
|
||||
public ConnectableProcessContext(final Connectable connectable, final StateManager stateManager) {
|
||||
this.connectable = connectable;
|
||||
this.propertyEncryptor = propertyEncryptor;
|
||||
this.stateManager = stateManager;
|
||||
this.deprecationLogger = DeprecationLoggerFactory.getLogger(connectable.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -227,18 +219,6 @@ public class ConnectableProcessContext implements ProcessContext {
|
|||
return new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String decrypt(String encrypted) {
|
||||
deprecationLogger.warn("ProcessContext.decrypt() should be replaced an alternative implementation");
|
||||
return propertyEncryptor.decrypt(encrypted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String encrypt(String unencrypted) {
|
||||
deprecationLogger.warn("ProcessContext.encrypt() should be replaced an alternative implementation");
|
||||
return propertyEncryptor.encrypt(unencrypted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerServiceLookup getControllerServiceLookup() {
|
||||
return null;
|
||||
|
|
|
@ -557,7 +557,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
private void shutdown(final ProcessGroup procGroup) {
|
||||
for (final ProcessorNode node : procGroup.getProcessors()) {
|
||||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, node.getProcessor().getClass(), node.getIdentifier())) {
|
||||
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor,
|
||||
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider,
|
||||
getStateManager(node.getIdentifier()), () -> false, nodeTypeProvider);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
|
||||
}
|
||||
|
@ -1124,7 +1124,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
}
|
||||
|
||||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, processor.getProcessor().getClass(), processor.getIdentifier())) {
|
||||
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor,
|
||||
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider,
|
||||
getStateManager(processor.getIdentifier()), () -> false, nodeTypeProvider);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
|
||||
} catch (final Exception e) {
|
||||
|
@ -3953,7 +3953,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
}
|
||||
|
||||
private ProcessContext createProcessContext(final ProcessorNode processorNode) {
|
||||
return new StandardProcessContext(processorNode, controllerServiceProvider, encryptor,
|
||||
return new StandardProcessContext(processorNode, controllerServiceProvider,
|
||||
stateManagerProvider.getStateManager(processorNode.getIdentifier()), () -> false, nodeTypeProvider);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,9 +36,6 @@ import org.apache.nifi.controller.PropertyConfiguration;
|
|||
import org.apache.nifi.controller.PropertyConfigurationMapper;
|
||||
import org.apache.nifi.controller.lifecycle.TaskTermination;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.deprecation.log.DeprecationLogger;
|
||||
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor;
|
||||
import org.apache.nifi.parameter.ParameterLookup;
|
||||
import org.apache.nifi.processor.exception.TerminatedTaskException;
|
||||
import org.apache.nifi.scheduling.ExecutionNode;
|
||||
|
@ -58,43 +55,67 @@ public class StandardProcessContext implements ProcessContext, ControllerService
|
|||
private final ProcessorNode procNode;
|
||||
private final ControllerServiceProvider controllerServiceProvider;
|
||||
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
|
||||
private final PropertyEncryptor propertyEncryptor;
|
||||
private final StateManager stateManager;
|
||||
private final TaskTermination taskTermination;
|
||||
private final NodeTypeProvider nodeTypeProvider;
|
||||
private final Map<PropertyDescriptor, String> properties;
|
||||
private final String annotationData;
|
||||
private final DeprecationLogger deprecationLogger;
|
||||
|
||||
public StandardProcessContext(
|
||||
final ProcessorNode processorNode,
|
||||
final ControllerServiceProvider controllerServiceProvider,
|
||||
final StateManager stateManager,
|
||||
final TaskTermination taskTermination,
|
||||
final NodeTypeProvider nodeTypeProvider
|
||||
) {
|
||||
|
||||
public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final PropertyEncryptor propertyEncryptor,
|
||||
final StateManager stateManager, final TaskTermination taskTermination, final NodeTypeProvider nodeTypeProvider) {
|
||||
|
||||
this(processorNode, controllerServiceProvider, propertyEncryptor, stateManager, taskTermination, nodeTypeProvider,
|
||||
processorNode.getEffectivePropertyValues(), processorNode.getAnnotationData());
|
||||
this(
|
||||
processorNode,
|
||||
controllerServiceProvider,
|
||||
stateManager,
|
||||
taskTermination,
|
||||
nodeTypeProvider,
|
||||
processorNode.getEffectivePropertyValues(),
|
||||
processorNode.getAnnotationData()
|
||||
);
|
||||
}
|
||||
|
||||
public StandardProcessContext(final ProcessorNode processorNode, final Map<String, String> propertiesOverride, final String annotationDataOverride, final ParameterLookup parameterLookup,
|
||||
final ControllerServiceProvider controllerServiceProvider, final PropertyEncryptor propertyEncryptor,
|
||||
final StateManager stateManager, final TaskTermination taskTermination, final NodeTypeProvider nodeTypeProvider) {
|
||||
|
||||
this(processorNode, controllerServiceProvider, propertyEncryptor, stateManager, taskTermination, nodeTypeProvider,
|
||||
resolvePropertyValues(processorNode, parameterLookup, propertiesOverride), annotationDataOverride);
|
||||
public StandardProcessContext(
|
||||
final ProcessorNode processorNode,
|
||||
final Map<String, String> propertiesOverride,
|
||||
final String annotationDataOverride,
|
||||
final ParameterLookup parameterLookup,
|
||||
final ControllerServiceProvider controllerServiceProvider,
|
||||
final StateManager stateManager,
|
||||
final TaskTermination taskTermination,
|
||||
final NodeTypeProvider nodeTypeProvider
|
||||
) {
|
||||
this(
|
||||
processorNode,
|
||||
controllerServiceProvider,
|
||||
stateManager,
|
||||
taskTermination,
|
||||
nodeTypeProvider,
|
||||
resolvePropertyValues(processorNode, parameterLookup, propertiesOverride),
|
||||
annotationDataOverride
|
||||
);
|
||||
}
|
||||
|
||||
public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final PropertyEncryptor propertyEncryptor,
|
||||
final StateManager stateManager, final TaskTermination taskTermination, final NodeTypeProvider nodeTypeProvider,
|
||||
final Map<PropertyDescriptor, String> propertyValues, final String annotationData) {
|
||||
public StandardProcessContext(
|
||||
final ProcessorNode processorNode,
|
||||
final ControllerServiceProvider controllerServiceProvider,
|
||||
final StateManager stateManager,
|
||||
final TaskTermination taskTermination,
|
||||
final NodeTypeProvider nodeTypeProvider,
|
||||
final Map<PropertyDescriptor, String> propertyValues,
|
||||
final String annotationData
|
||||
) {
|
||||
this.procNode = processorNode;
|
||||
this.controllerServiceProvider = controllerServiceProvider;
|
||||
this.propertyEncryptor = propertyEncryptor;
|
||||
this.stateManager = stateManager;
|
||||
this.taskTermination = taskTermination;
|
||||
this.nodeTypeProvider = nodeTypeProvider;
|
||||
this.annotationData = annotationData;
|
||||
final Class<?> componentClass = processorNode.getComponentClass();
|
||||
final Class<?> loggerClass = componentClass == null ? getClass() : componentClass;
|
||||
this.deprecationLogger = DeprecationLoggerFactory.getLogger(loggerClass);
|
||||
|
||||
properties = Collections.unmodifiableMap(propertyValues);
|
||||
|
||||
|
@ -230,20 +251,6 @@ public class StandardProcessContext implements ProcessContext, ControllerService
|
|||
return propValueMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String encrypt(final String unencrypted) {
|
||||
verifyTaskActive();
|
||||
deprecationLogger.warn("ProcessContext.encrypt() should be replaced an alternative implementation");
|
||||
return propertyEncryptor.encrypt(unencrypted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String decrypt(final String encrypted) {
|
||||
verifyTaskActive();
|
||||
deprecationLogger.warn("ProcessContext.decrypt() should be replaced an alternative implementation");
|
||||
return propertyEncryptor.decrypt(encrypted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
|
||||
verifyTaskActive();
|
||||
|
|
|
@ -549,7 +549,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
|
||||
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
|
||||
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue,
|
||||
repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
|
||||
repositoryContextFactory, maxEventDrivenThreads.get(), extensionManager, this);
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
|
||||
|
||||
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
|
||||
|
@ -983,7 +983,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
|
||||
final Processor processor = procNode.getProcessor();
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor,
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider,
|
||||
getStateManagerProvider().getStateManager(processor.getIdentifier()), () -> false, this);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor, processContext);
|
||||
}
|
||||
|
|
|
@ -435,7 +435,7 @@ public class StandardFlowSnippet implements FlowSnippet {
|
|||
}
|
||||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
|
||||
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()), () -> false, flowController);
|
||||
procNode.onConfigurationRestored(processContext);
|
||||
} finally {
|
||||
|
|
|
@ -74,7 +74,7 @@ public class StandardReloadComponent implements ReloadComponent {
|
|||
|
||||
final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
|
||||
flowController.getEncryptor(), stateManager, () -> false, flowController);
|
||||
stateManager, () -> false, flowController);
|
||||
|
||||
// call OnRemoved for the existing processor using the previous instance class loader
|
||||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
|
|||
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
|
||||
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor;
|
||||
import org.apache.nifi.engine.FlowEngine;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.nar.ExtensionManager;
|
||||
|
@ -70,7 +69,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
|||
private final RepositoryContextFactory contextFactory;
|
||||
private final AtomicInteger maxThreadCount;
|
||||
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
|
||||
private final PropertyEncryptor encryptor;
|
||||
private final ExtensionManager extensionManager;
|
||||
private final NodeTypeProvider nodeTypeProvider;
|
||||
|
||||
|
@ -81,14 +79,13 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
|||
|
||||
public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
|
||||
final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount,
|
||||
final PropertyEncryptor encryptor, final ExtensionManager extensionManager, final NodeTypeProvider nodeTypeProvider) {
|
||||
final ExtensionManager extensionManager, final NodeTypeProvider nodeTypeProvider) {
|
||||
super(flowEngine);
|
||||
this.serviceProvider = serviceProvider;
|
||||
this.stateManagerProvider = stateManagerProvider;
|
||||
this.workerQueue = workerQueue;
|
||||
this.contextFactory = contextFactory;
|
||||
this.maxThreadCount = new AtomicInteger(maxThreadCount);
|
||||
this.encryptor = encryptor;
|
||||
this.extensionManager = extensionManager;
|
||||
this.nodeTypeProvider = nodeTypeProvider;
|
||||
|
||||
|
@ -218,7 +215,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
|||
final ProcessorNode procNode = (ProcessorNode) connectable;
|
||||
final StateManager stateManager = new TaskTerminationAwareStateManager(getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
|
||||
final StandardProcessContext standardProcessContext = new StandardProcessContext(
|
||||
procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated, nodeTypeProvider);
|
||||
procNode, serviceProvider, stateManager, scheduleState::isTerminated, nodeTypeProvider);
|
||||
|
||||
final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
|
||||
final ProcessSessionFactory sessionFactory;
|
||||
|
@ -295,7 +292,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
|||
} else {
|
||||
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker());
|
||||
final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);
|
||||
final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
|
||||
final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, getStateManager(connectable.getIdentifier()));
|
||||
trigger(connectable, scheduleState, connectableProcessContext, activeSessionFactory);
|
||||
|
||||
// See explanation above for the ProcessorNode as to why we do this.
|
||||
|
|
|
@ -333,7 +333,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
|
||||
|
||||
final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
|
||||
flowController.getEncryptor(), getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
|
||||
getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
|
||||
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
|
||||
|
@ -373,7 +373,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
|
||||
|
||||
final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
|
||||
flowController.getEncryptor(), getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
|
||||
getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
|
||||
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
|
||||
|
@ -414,7 +414,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
final LifecycleState lifecycleState = getLifecycleState(procNode, false);
|
||||
|
||||
StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
|
||||
flowController.getEncryptor(), getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
|
||||
getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
|
||||
|
||||
LOG.info("Stopping {}", procNode);
|
||||
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), lifecycleState);
|
||||
|
@ -563,7 +563,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
getSchedulingAgent(connectable).unschedule(connectable, state);
|
||||
|
||||
if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
|
||||
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, flowController.getEncryptor(), getStateManager(connectable.getIdentifier()));
|
||||
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, getStateManager(connectable.getIdentifier()));
|
||||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getClass(), connectable.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContex
|
|||
import org.apache.nifi.controller.scheduling.LifecycleState;
|
||||
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
|
||||
import org.apache.nifi.controller.scheduling.SchedulingAgent;
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.nar.NarCloseable;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
@ -88,14 +87,12 @@ public class ConnectableTask {
|
|||
this.flowController = flowController;
|
||||
this.threadMXBean = ManagementFactory.getThreadMXBean();
|
||||
|
||||
final PropertyEncryptor encryptor = flowController.getEncryptor();
|
||||
|
||||
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
|
||||
if (connectable instanceof ProcessorNode) {
|
||||
processContext = new StandardProcessContext(
|
||||
(ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated, flowController);
|
||||
(ProcessorNode) connectable, flowController.getControllerServiceProvider(), stateManager, scheduleState::isTerminated, flowController);
|
||||
} else {
|
||||
processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
|
||||
processContext = new ConnectableProcessContext(connectable, stateManager);
|
||||
}
|
||||
|
||||
repositoryContext = contextFactory.newProcessContext(connectable, new AtomicLong(0L));
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller.serialization
|
||||
|
||||
import org.apache.nifi.encrypt.EncryptionException
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import static groovy.test.GroovyAssert.shouldFail
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue
|
||||
|
||||
class FlowFromDOMFactoryTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FlowFromDOMFactoryTest.class)
|
||||
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldDecryptSensitiveFlowValue() throws Exception {
|
||||
// Arrange
|
||||
final String property = "property"
|
||||
String wrappedProperty = "enc{${property}}"
|
||||
|
||||
PropertyEncryptor flowEncryptor = createEncryptor()
|
||||
|
||||
// Act
|
||||
String recovered = FlowFromDOMFactory.decrypt(wrappedProperty, flowEncryptor)
|
||||
logger.info("Recovered: ${recovered}")
|
||||
|
||||
// Assert
|
||||
assertEquals(property, recovered)
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldProvideBetterErrorMessageOnDecryptionFailure() throws Exception {
|
||||
// Arrange
|
||||
final String property = "property"
|
||||
String wrappedProperty = "enc{${property}}"
|
||||
|
||||
PropertyEncryptor flowEncryptor = createExceptionEncryptor()
|
||||
|
||||
// Act
|
||||
EncryptionException ee = assertThrows(EncryptionException.class,
|
||||
() -> FlowFromDOMFactory.decrypt(wrappedProperty, flowEncryptor))
|
||||
logger.expected(ee.getMessage())
|
||||
|
||||
// Assert
|
||||
assertTrue(ee.getMessage().contains("Check that the nifi.sensitive.props.key value " +
|
||||
"in nifi.properties matches the value used to encrypt the flow.xml.gz file"))
|
||||
}
|
||||
|
||||
private PropertyEncryptor createEncryptor() {
|
||||
return new PropertyEncryptor() {
|
||||
@Override
|
||||
String encrypt(String property) {
|
||||
return property;
|
||||
}
|
||||
|
||||
@Override
|
||||
String decrypt(String encryptedProperty) {
|
||||
return encryptedProperty;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private PropertyEncryptor createExceptionEncryptor() {
|
||||
return new PropertyEncryptor() {
|
||||
@Override
|
||||
String encrypt(String property) {
|
||||
return property;
|
||||
}
|
||||
|
||||
@Override
|
||||
String decrypt(String encryptedProperty) {
|
||||
throw new EncryptionException("Failed")
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -1,122 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.fingerprint
|
||||
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor
|
||||
import org.apache.nifi.encrypt.SensitiveValueEncoder
|
||||
import org.apache.nifi.nar.ExtensionManager
|
||||
import org.apache.nifi.nar.StandardExtensionDiscoveringManager
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.security.Security
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue
|
||||
|
||||
class FingerprintFactoryGroovyIT {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FingerprintFactoryGroovyIT.class)
|
||||
|
||||
private static PropertyEncryptor mockEncryptor = [
|
||||
encrypt: { String plaintext -> plaintext.reverse() },
|
||||
decrypt: { String cipherText -> cipherText.reverse() }] as PropertyEncryptor
|
||||
private static SensitiveValueEncoder mockSensitiveValueEncoder = [
|
||||
getEncoded: { String plaintext -> "[MASKED] (${plaintext.sha256()})".toString() }] as SensitiveValueEncoder
|
||||
private static ExtensionManager extensionManager = new StandardExtensionDiscoveringManager()
|
||||
|
||||
private static String originalPropertiesPath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH)
|
||||
private static final String NIFI_PROPERTIES_PATH = "src/test/resources/conf/nifi.properties"
|
||||
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDownOnce() {
|
||||
if (originalPropertiesPath) {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, originalPropertiesPath)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The initial implementation derived the hashed value using a time/memory-hard algorithm (Argon2) every time.
|
||||
* For large flow definitions, this blocked startup for minutes. Deriving a secure key with the Argon2
|
||||
* algorithm once at startup (~1 second) and using this cached key for a simple HMAC/SHA-256 operation on every
|
||||
* fingerprint should be much faster.
|
||||
*/
|
||||
@Test
|
||||
void testCreateFingerprintShouldNotBeSlow() {
|
||||
// Arrange
|
||||
int testIterations = 100 //_000
|
||||
|
||||
// Set up test nifi.properties
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, NIFI_PROPERTIES_PATH)
|
||||
|
||||
// Create flow
|
||||
String initialFlowXML = new File("src/test/resources/nifi/fingerprint/initial.xml").text
|
||||
logger.info("Read initial flow: ${initialFlowXML[0..<100]}...")
|
||||
|
||||
// Create the FingerprintFactory with collaborators
|
||||
FingerprintFactory fingerprintFactory =
|
||||
new FingerprintFactory(mockEncryptor, extensionManager, mockSensitiveValueEncoder)
|
||||
|
||||
List<String> results = []
|
||||
def resultDurations = []
|
||||
|
||||
// Act
|
||||
testIterations.times { int i ->
|
||||
long startNanos = System.nanoTime()
|
||||
|
||||
// Create the fingerprint from the flow
|
||||
String fingerprint = fingerprintFactory.createFingerprint(initialFlowXML.bytes)
|
||||
|
||||
long endNanos = System.nanoTime()
|
||||
long durationNanos = endNanos - startNanos
|
||||
|
||||
logger.info("Generated flow fingerprint: ${fingerprint} in ${durationNanos} ns")
|
||||
|
||||
results << fingerprint
|
||||
resultDurations << durationNanos
|
||||
}
|
||||
|
||||
def milliDurations = [resultDurations.min(), resultDurations.max(), resultDurations.sum() / resultDurations.size()].collect { it / 1_000_000 }
|
||||
logger.info("Min/Max/Avg durations in ms: ${milliDurations}")
|
||||
|
||||
// Assert
|
||||
final long MAX_DURATION_NANOS = 1_000_000_000 // 1 second
|
||||
assertTrue(resultDurations.max() <= MAX_DURATION_NANOS * 2)
|
||||
assertTrue(resultDurations.sum() / testIterations < MAX_DURATION_NANOS)
|
||||
|
||||
// Assert the fingerprint does not contain the password
|
||||
results.forEach(fingerprint -> {
|
||||
assertFalse(fingerprint.contains("originalPlaintextPassword"))
|
||||
def maskedValue = (fingerprint =~ /\[MASKED\] \([\w\/\+=]+\)/)
|
||||
assertTrue(maskedValue.find())
|
||||
logger.info("Masked value: ${maskedValue[0]}")
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,93 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.fingerprint
|
||||
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor
|
||||
import org.apache.nifi.encrypt.SensitiveValueEncoder
|
||||
import org.apache.nifi.nar.ExtensionManager
|
||||
import org.apache.nifi.nar.StandardExtensionDiscoveringManager
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.security.Security
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue
|
||||
|
||||
class FingerprintFactoryGroovyTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FingerprintFactoryGroovyTest.class)
|
||||
|
||||
private static PropertyEncryptor mockEncryptor = [
|
||||
encrypt: { String plaintext -> plaintext.reverse() },
|
||||
decrypt: { String cipherText -> cipherText.reverse() }] as PropertyEncryptor
|
||||
private static SensitiveValueEncoder mockSensitiveValueEncoder = [
|
||||
getEncoded: { String plaintext -> "[MASKED] (${plaintext.sha256()})".toString() }] as SensitiveValueEncoder
|
||||
private static ExtensionManager extensionManager = new StandardExtensionDiscoveringManager()
|
||||
|
||||
private static String originalPropertiesPath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH)
|
||||
private static final String NIFI_PROPERTIES_PATH = "src/test/resources/conf/nifi.properties"
|
||||
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDownOnce() {
|
||||
if (originalPropertiesPath) {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, originalPropertiesPath)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The flow fingerprint should not disclose sensitive property values.
|
||||
*/
|
||||
@Test
|
||||
void testCreateFingerprintShouldNotDiscloseSensitivePropertyValues() {
|
||||
// Arrange
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, NIFI_PROPERTIES_PATH)
|
||||
|
||||
// Create flow
|
||||
String initialFlowXML = new File("src/test/resources/nifi/fingerprint/initial.xml").text
|
||||
logger.info("Read initial flow: ${initialFlowXML[0..<100]}...")
|
||||
|
||||
// Create the FingerprintFactory with collaborators
|
||||
FingerprintFactory fingerprintFactory = new FingerprintFactory(mockEncryptor, extensionManager, mockSensitiveValueEncoder)
|
||||
|
||||
// Act
|
||||
|
||||
// Create the fingerprint from the flow
|
||||
String fingerprint = fingerprintFactory.createFingerprint(initialFlowXML.bytes)
|
||||
logger.info("Generated flow fingerprint: ${fingerprint}")
|
||||
|
||||
// Assert
|
||||
// Assert the fingerprint does not contain the password
|
||||
assertFalse(fingerprint.contains("originalPlaintextPassword"))
|
||||
def maskedValue = (fingerprint =~ /\[MASKED\] \([\w\/\+=]+\)/)
|
||||
assertTrue(maskedValue.find())
|
||||
logger.info("Masked value: ${maskedValue[0]}")
|
||||
}
|
||||
}
|
|
@ -152,7 +152,7 @@ public class StandardProcessorNodeIT {
|
|||
new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
|
||||
final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true);
|
||||
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false, null);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, () -> false, null);
|
||||
final SchedulingAgentCallback schedulingAgentCallback = new FailIfTriggeredSchedulingAgentCallback(taskScheduler);
|
||||
|
||||
procNode.performValidation();
|
||||
|
@ -535,7 +535,7 @@ public class StandardProcessorNodeIT {
|
|||
final ProcessorNode procNode = createProcessorNode(alwaysInvalid, new MockReloadComponent());
|
||||
|
||||
final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestStartInvalidProcessorThenStopFutureTriggered", true);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false, null);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, () -> false, null);
|
||||
final SchedulingAgentCallback schedulingAgentCallback = new FailIfTriggeredSchedulingAgentCallback(taskScheduler);
|
||||
|
||||
procNode.start(taskScheduler, 20000L, 10000L, () -> processContext, schedulingAgentCallback, true);
|
||||
|
|
|
@ -444,7 +444,7 @@ public class ParametersIT extends FrameworkIntegrationTest {
|
|||
properties.put("password", "#{pass}");
|
||||
usernamePassword.setProperties(properties);
|
||||
|
||||
final ProcessContext processContext = new StandardProcessContext(usernamePassword, getFlowController().getControllerServiceProvider(), getFlowController().getEncryptor(),
|
||||
final ProcessContext processContext = new StandardProcessContext(usernamePassword, getFlowController().getControllerServiceProvider(),
|
||||
getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false, getFlowController());
|
||||
final PropertyDescriptor descriptor = usernamePassword.getPropertyDescriptor("password");
|
||||
final PropertyValue propertyValue = processContext.getProperty(descriptor);
|
||||
|
|
|
@ -75,16 +75,6 @@ public class MockProcessContext implements ProcessContext {
|
|||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String encrypt(String unencrypted) {
|
||||
return unencrypted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String decrypt(String encrypted) {
|
||||
return encrypted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerServiceLookup getControllerServiceLookup() {
|
||||
return new MockControllerServiceLookup();
|
||||
|
|
|
@ -128,7 +128,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
configureProcessor(processor, processorDTO);
|
||||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(processor, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
|
||||
final StandardProcessContext processContext = new StandardProcessContext(processor, flowController.getControllerServiceProvider(),
|
||||
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier()), () -> false, flowController);
|
||||
processor.onConfigurationRestored(processContext);
|
||||
|
||||
|
@ -481,7 +481,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
final ProcessorNode processor = locateProcessor(processorId);
|
||||
|
||||
final ProcessContext processContext = new StandardProcessContext(processor, properties, processor.getAnnotationData(),
|
||||
processor.getProcessGroup().getParameterContext(), flowController.getControllerServiceProvider(), flowController.getEncryptor(),
|
||||
processor.getProcessGroup().getParameterContext(), flowController.getControllerServiceProvider(),
|
||||
new NopStateManager(), () -> false, flowController);
|
||||
|
||||
final LogRepository logRepository = new NopLogRepository();
|
||||
|
|
|
@ -183,7 +183,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
|
|||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
|
||||
final StateManager stateManager = statelessEngine.getStateManagerProvider().getStateManager(id);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, statelessEngine.getControllerServiceProvider(),
|
||||
statelessEngine.getPropertyEncryptor(), stateManager, () -> false, new StatelessNodeTypeProvider());
|
||||
stateManager, () -> false, new StatelessNodeTypeProvider());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor(), processContext);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,31 +24,27 @@ import org.apache.nifi.controller.NodeTypeProvider;
|
|||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.encrypt.PropertyEncryptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.StandardProcessContext;
|
||||
|
||||
public class StatelessProcessContextFactory implements ProcessContextFactory {
|
||||
private static final NodeTypeProvider NODE_TYPE_PROVIDER = new StatelessNodeTypeProvider();
|
||||
private final ControllerServiceProvider controllerServiceProvider;
|
||||
private final PropertyEncryptor encryptor;
|
||||
private final StateManagerProvider stateManagerProvider;
|
||||
|
||||
public StatelessProcessContextFactory(final ControllerServiceProvider controllerServiceProvider, final PropertyEncryptor encryptor, final StateManagerProvider stateManagerProvider) {
|
||||
public StatelessProcessContextFactory(final ControllerServiceProvider controllerServiceProvider, final StateManagerProvider stateManagerProvider) {
|
||||
this.controllerServiceProvider = controllerServiceProvider;
|
||||
this.encryptor = encryptor;
|
||||
this.stateManagerProvider = stateManagerProvider;
|
||||
}
|
||||
|
||||
|
||||
public ProcessContext createProcessContext(final Connectable connectable) {
|
||||
final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier());
|
||||
|
||||
if (connectable instanceof ProcessorNode) {
|
||||
final ProcessorNode processor = (ProcessorNode) connectable;
|
||||
return new StandardProcessContext(processor, controllerServiceProvider, encryptor, stateManager, () -> false, NODE_TYPE_PROVIDER);
|
||||
return new StandardProcessContext(processor, controllerServiceProvider, stateManager, () -> false, NODE_TYPE_PROVIDER);
|
||||
}
|
||||
|
||||
return new ConnectableProcessContext(connectable, encryptor, stateManager);
|
||||
return new ConnectableProcessContext(connectable, stateManager);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ public class StatelessReloadComponent implements ReloadComponent {
|
|||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
|
||||
final StateManager stateManager = statelessEngine.getStateManagerProvider().getStateManager(id);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(existingNode, statelessEngine.getControllerServiceProvider(),
|
||||
statelessEngine.getPropertyEncryptor(), stateManager, () -> false, new StatelessNodeTypeProvider());
|
||||
stateManager, () -> false, new StatelessNodeTypeProvider());
|
||||
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
|
||||
} finally {
|
||||
|
@ -107,7 +107,7 @@ public class StatelessReloadComponent implements ReloadComponent {
|
|||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(existingNode, statelessEngine.getControllerServiceProvider(),
|
||||
statelessEngine.getPropertyEncryptor(), statelessEngine.getStateManagerProvider().getStateManager(id), () -> false, new StatelessNodeTypeProvider());
|
||||
statelessEngine.getStateManagerProvider().getStateManager(id), () -> false, new StatelessNodeTypeProvider());
|
||||
existingNode.onConfigurationRestored(processContext);
|
||||
|
||||
logger.debug("Successfully reloaded {}", existingNode);
|
||||
|
|
|
@ -198,7 +198,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
|
|||
|
||||
final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
|
||||
|
||||
final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, lazyInitializedEncryptor, stateManagerProvider);
|
||||
final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, stateManagerProvider);
|
||||
final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
|
||||
contentRepo = createContentRepository(engineConfiguration);
|
||||
flowFileRepo = new StatelessFlowFileRepository();
|
||||
|
|
Loading…
Reference in New Issue