mirror of https://github.com/apache/nifi.git
NIFI-13560 Changed Parameter Provider handling to avoid storing values (#9102)
- Added ParameterValueMapper for handling serialization of Parameter Values for Flow Configuration - Added Parameter Group retrieval method for Flow Synchronizer
This commit is contained in:
parent
7c4c5ae693
commit
b0f419be2c
|
@ -339,6 +339,20 @@ public class StandardParameterProviderNode extends AbstractComponentNode impleme
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<ParameterGroup> findFetchedParameterGroup(final String parameterGroupName) {
|
||||||
|
Objects.requireNonNull(parameterGroupName, "Parameter Group Name required");
|
||||||
|
|
||||||
|
readLock.lock();
|
||||||
|
try {
|
||||||
|
return fetchedParameterGroups.stream()
|
||||||
|
.filter(parameterGroup -> parameterGroup.getGroupName().equals(parameterGroupName))
|
||||||
|
.findFirst();
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanApplyParameters(final Collection<ParameterGroupConfiguration> parameterGroupConfigurations) {
|
public void verifyCanApplyParameters(final Collection<ParameterGroupConfiguration> parameterGroupConfigurations) {
|
||||||
if (fetchedParameterGroups.isEmpty()) {
|
if (fetchedParameterGroups.isEmpty()) {
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.parameter;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filter values and return null for Parameter values with sensitive descriptors
|
||||||
|
*/
|
||||||
|
public class FilterSensitiveParameterValueMapper implements ParameterValueMapper {
|
||||||
|
/**
|
||||||
|
* Get mapped Parameter value based on properties
|
||||||
|
*
|
||||||
|
* @param parameter Parameter with descriptor of attributes for mapping
|
||||||
|
* @param value Parameter value to be mapped
|
||||||
|
* @return Mapped Parameter value
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getMapped(final Parameter parameter, final String value) {
|
||||||
|
Objects.requireNonNull(parameter, "Parameter required");
|
||||||
|
|
||||||
|
final ParameterDescriptor descriptor = parameter.getDescriptor();
|
||||||
|
final String mapped;
|
||||||
|
|
||||||
|
if (descriptor.isSensitive()) {
|
||||||
|
mapped = null;
|
||||||
|
} else {
|
||||||
|
mapped = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
return mapped;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.parameter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstraction for mapping Parameter values from runtime representation to versioned representation
|
||||||
|
*/
|
||||||
|
public interface ParameterValueMapper {
|
||||||
|
/**
|
||||||
|
* Get mapped Parameter value based on properties
|
||||||
|
*
|
||||||
|
* @param parameter Parameter with descriptor of attributes for mapping
|
||||||
|
* @param value Parameter value to be mapped
|
||||||
|
* @return Mapped Parameter value
|
||||||
|
*/
|
||||||
|
String getMapped(Parameter parameter, String value);
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.parameter;
|
||||||
|
|
||||||
|
import org.apache.nifi.registry.flow.mapping.SensitiveValueEncryptor;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Standard implementation with encryptor for sensitive values
|
||||||
|
*/
|
||||||
|
public class StandardParameterValueMapper implements ParameterValueMapper {
|
||||||
|
static final String PROVIDED_MAPPING = "provided:parameter";
|
||||||
|
|
||||||
|
private static final String ENCRYPTED_FORMAT = "enc{%s}";
|
||||||
|
|
||||||
|
private final SensitiveValueEncryptor sensitiveValueEncryptor;
|
||||||
|
|
||||||
|
public StandardParameterValueMapper(final SensitiveValueEncryptor sensitiveValueEncryptor) {
|
||||||
|
this.sensitiveValueEncryptor = sensitiveValueEncryptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get mapped Parameter value based on properties
|
||||||
|
*
|
||||||
|
* @param parameter Parameter with descriptor of attributes for mapping
|
||||||
|
* @param value Parameter value to be mapped
|
||||||
|
* @return Mapped Parameter value
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getMapped(final Parameter parameter, final String value) {
|
||||||
|
Objects.requireNonNull(parameter, "Parameter required");
|
||||||
|
|
||||||
|
final ParameterDescriptor descriptor = parameter.getDescriptor();
|
||||||
|
final String mapped;
|
||||||
|
|
||||||
|
if (parameter.isProvided()) {
|
||||||
|
mapped = PROVIDED_MAPPING;
|
||||||
|
} else if (descriptor.isSensitive()) {
|
||||||
|
if (sensitiveValueEncryptor == null) {
|
||||||
|
mapped = value;
|
||||||
|
} else {
|
||||||
|
final String encrypted = sensitiveValueEncryptor.encrypt(value);
|
||||||
|
mapped = ENCRYPTED_FORMAT.formatted(encrypted);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mapped = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
return mapped;
|
||||||
|
}
|
||||||
|
}
|
|
@ -73,12 +73,15 @@ import org.apache.nifi.flow.VersionedResourceType;
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.groups.RemoteProcessGroup;
|
import org.apache.nifi.groups.RemoteProcessGroup;
|
||||||
import org.apache.nifi.nar.ExtensionManager;
|
import org.apache.nifi.nar.ExtensionManager;
|
||||||
|
import org.apache.nifi.parameter.FilterSensitiveParameterValueMapper;
|
||||||
import org.apache.nifi.parameter.Parameter;
|
import org.apache.nifi.parameter.Parameter;
|
||||||
import org.apache.nifi.parameter.ParameterContext;
|
import org.apache.nifi.parameter.ParameterContext;
|
||||||
import org.apache.nifi.parameter.ParameterDescriptor;
|
import org.apache.nifi.parameter.ParameterDescriptor;
|
||||||
import org.apache.nifi.parameter.ParameterProvider;
|
import org.apache.nifi.parameter.ParameterProvider;
|
||||||
import org.apache.nifi.parameter.ParameterProviderConfiguration;
|
import org.apache.nifi.parameter.ParameterProviderConfiguration;
|
||||||
import org.apache.nifi.parameter.ParameterReferencedControllerServiceData;
|
import org.apache.nifi.parameter.ParameterReferencedControllerServiceData;
|
||||||
|
import org.apache.nifi.parameter.ParameterValueMapper;
|
||||||
|
import org.apache.nifi.parameter.StandardParameterValueMapper;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
|
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
|
||||||
import org.apache.nifi.registry.flow.VersionControlInformation;
|
import org.apache.nifi.registry.flow.VersionControlInformation;
|
||||||
|
@ -107,6 +110,7 @@ public class NiFiRegistryFlowMapper {
|
||||||
|
|
||||||
private final ExtensionManager extensionManager;
|
private final ExtensionManager extensionManager;
|
||||||
private final FlowMappingOptions flowMappingOptions;
|
private final FlowMappingOptions flowMappingOptions;
|
||||||
|
private final ParameterValueMapper parameterValueMapper;
|
||||||
|
|
||||||
// We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
|
// We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
|
||||||
// we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
|
// we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
|
||||||
|
@ -121,6 +125,12 @@ public class NiFiRegistryFlowMapper {
|
||||||
public NiFiRegistryFlowMapper(final ExtensionManager extensionManager, final FlowMappingOptions flowMappingOptions) {
|
public NiFiRegistryFlowMapper(final ExtensionManager extensionManager, final FlowMappingOptions flowMappingOptions) {
|
||||||
this.extensionManager = extensionManager;
|
this.extensionManager = extensionManager;
|
||||||
this.flowMappingOptions = flowMappingOptions;
|
this.flowMappingOptions = flowMappingOptions;
|
||||||
|
|
||||||
|
if (flowMappingOptions.isMapSensitiveConfiguration()) {
|
||||||
|
this.parameterValueMapper = new StandardParameterValueMapper(flowMappingOptions.getSensitiveValueEncryptor());
|
||||||
|
} else {
|
||||||
|
this.parameterValueMapper = new FilterSensitiveParameterValueMapper();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -911,10 +921,10 @@ public class NiFiRegistryFlowMapper {
|
||||||
if (referencedControllerServiceData.isEmpty()) {
|
if (referencedControllerServiceData.isEmpty()) {
|
||||||
versionedParameter = mapParameter(parameter);
|
versionedParameter = mapParameter(parameter);
|
||||||
} else {
|
} else {
|
||||||
versionedParameter = mapParameter(
|
final String referencedVersionServiceId = referencedControllerServiceData.getFirst().getVersionedServiceId();
|
||||||
parameter,
|
final String parameterValue = parameter.getValue();
|
||||||
getId(Optional.ofNullable(referencedControllerServiceData.get(0).getVersionedServiceId()), parameter.getValue())
|
final String serviceId = getId(Optional.ofNullable(referencedVersionServiceId), parameterValue);
|
||||||
);
|
versionedParameter = mapParameter(parameter, serviceId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
versionedParameter = mapParameter(parameter);
|
versionedParameter = mapParameter(parameter);
|
||||||
|
@ -954,19 +964,8 @@ public class NiFiRegistryFlowMapper {
|
||||||
versionedParameter.setSensitive(descriptor.isSensitive());
|
versionedParameter.setSensitive(descriptor.isSensitive());
|
||||||
versionedParameter.setProvided(parameter.isProvided());
|
versionedParameter.setProvided(parameter.isProvided());
|
||||||
|
|
||||||
final boolean mapParameterValue = flowMappingOptions.isMapSensitiveConfiguration() || !descriptor.isSensitive();
|
final String mapped = parameterValueMapper.getMapped(parameter, value);
|
||||||
final String parameterValue;
|
versionedParameter.setValue(mapped);
|
||||||
if (mapParameterValue) {
|
|
||||||
if (descriptor.isSensitive()) {
|
|
||||||
parameterValue = encrypt(value);
|
|
||||||
} else {
|
|
||||||
parameterValue = value;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
parameterValue = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
versionedParameter.setValue(parameterValue);
|
|
||||||
return versionedParameter;
|
return versionedParameter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.parameter;
|
||||||
|
|
||||||
|
import org.apache.nifi.registry.flow.mapping.SensitiveValueEncryptor;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class TestStandardParameterValueMapper {
|
||||||
|
private static final String NAME = "NamedParameter";
|
||||||
|
|
||||||
|
private static final String VALUE = "ParameterValue";
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private SensitiveValueEncryptor sensitiveValueEncryptor;
|
||||||
|
|
||||||
|
private StandardParameterValueMapper mapper;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setMapper() {
|
||||||
|
mapper = new StandardParameterValueMapper(sensitiveValueEncryptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetMappedNotSensitiveNotProvided() {
|
||||||
|
final Parameter parameter = getParameter(false, false);
|
||||||
|
|
||||||
|
final String mapped = mapper.getMapped(parameter, VALUE);
|
||||||
|
|
||||||
|
assertEquals(VALUE, mapped);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetMappedNotSensitiveProvided() {
|
||||||
|
final Parameter parameter = getParameter(false, true);
|
||||||
|
|
||||||
|
final String mapped = mapper.getMapped(parameter, VALUE);
|
||||||
|
|
||||||
|
assertEquals(StandardParameterValueMapper.PROVIDED_MAPPING, mapped);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetMappedSensitiveProvided() {
|
||||||
|
final Parameter parameter = getParameter(true, true);
|
||||||
|
|
||||||
|
final String mapped = mapper.getMapped(parameter, VALUE);
|
||||||
|
|
||||||
|
assertEquals(StandardParameterValueMapper.PROVIDED_MAPPING, mapped);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetMappedSensitiveNotProvided() {
|
||||||
|
final Parameter parameter = getParameter(true, false);
|
||||||
|
|
||||||
|
final String mapped = mapper.getMapped(parameter, VALUE);
|
||||||
|
|
||||||
|
assertNotEquals(VALUE, mapped);
|
||||||
|
assertNotEquals(StandardParameterValueMapper.PROVIDED_MAPPING, mapped);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Parameter getParameter(final boolean sensitive, final boolean provided) {
|
||||||
|
final ParameterDescriptor descriptor = new ParameterDescriptor.Builder().name(NAME).sensitive(sensitive).build();
|
||||||
|
return new Parameter(descriptor, VALUE, null, provided);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,11 +20,13 @@ import org.apache.nifi.components.ConfigVerificationResult;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.nar.ExtensionManager;
|
import org.apache.nifi.nar.ExtensionManager;
|
||||||
import org.apache.nifi.parameter.ParameterContext;
|
import org.apache.nifi.parameter.ParameterContext;
|
||||||
|
import org.apache.nifi.parameter.ParameterGroup;
|
||||||
import org.apache.nifi.parameter.ParameterProvider;
|
import org.apache.nifi.parameter.ParameterProvider;
|
||||||
import org.apache.nifi.parameter.ParameterGroupConfiguration;
|
import org.apache.nifi.parameter.ParameterGroupConfiguration;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public interface ParameterProviderNode extends ComponentNode {
|
public interface ParameterProviderNode extends ComponentNode {
|
||||||
|
@ -41,12 +43,30 @@ public interface ParameterProviderNode extends ComponentNode {
|
||||||
|
|
||||||
void verifyCanFetchParameters();
|
void verifyCanFetchParameters();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve Parameter Groups from configured Parameter Provider and store for subsequent retrieval in others methods
|
||||||
|
*
|
||||||
|
*/
|
||||||
void fetchParameters();
|
void fetchParameters();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find a named Parameter Group cached from previous request to fetch Parameters from the configured Parameter Provider
|
||||||
|
*
|
||||||
|
* @param parameterGroupName Parameter Group Name to find
|
||||||
|
* @return Parameter Group with Parameter Names and Values or empty when not found
|
||||||
|
*/
|
||||||
|
Optional<ParameterGroup> findFetchedParameterGroup(String parameterGroupName);
|
||||||
|
|
||||||
void verifyCanApplyParameters(Collection<ParameterGroupConfiguration> parameterNames);
|
void verifyCanApplyParameters(Collection<ParameterGroupConfiguration> parameterNames);
|
||||||
|
|
||||||
Collection<ParameterGroupConfiguration> getParameterGroupConfigurations();
|
Collection<ParameterGroupConfiguration> getParameterGroupConfigurations();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Parameter Groups with Parameter Names and Values to be applied to associated Parameter Contexts
|
||||||
|
*
|
||||||
|
* @param parameterGroupConfigurations Parameter Group Configurations to be retrieved
|
||||||
|
* @return List of Parameter Groups and Parameter Contexts to be applied based on Parameters retrieved in previous fetch requests
|
||||||
|
*/
|
||||||
List<ParametersApplication> getFetchedParametersToApply(Collection<ParameterGroupConfiguration> parameterGroupConfigurations);
|
List<ParametersApplication> getFetchedParametersToApply(Collection<ParameterGroupConfiguration> parameterGroupConfigurations);
|
||||||
|
|
||||||
void verifyCanClearState();
|
void verifyCanClearState();
|
||||||
|
@ -76,7 +96,7 @@ public interface ParameterProviderNode extends ComponentNode {
|
||||||
* @param context the configuration to verify
|
* @param context the configuration to verify
|
||||||
* @param logger a logger that can be used when performing verification
|
* @param logger a logger that can be used when performing verification
|
||||||
* @param extensionManager extension manager that is used for obtaining appropriate NAR ClassLoaders
|
* @param extensionManager extension manager that is used for obtaining appropriate NAR ClassLoaders
|
||||||
* @return a list of results indicating whether or not the given configuration is valid
|
* @return a list of results indicating whether the given configuration is valid
|
||||||
*/
|
*/
|
||||||
List<ConfigVerificationResult> verifyConfiguration(ConfigurationContext context, ComponentLog logger, ExtensionManager extensionManager);
|
List<ConfigVerificationResult> verifyConfiguration(ConfigurationContext context, ComponentLog logger, ExtensionManager extensionManager);
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.nifi.controller.inheritance.ConnectionMissingCheck;
|
||||||
import org.apache.nifi.controller.inheritance.FlowInheritability;
|
import org.apache.nifi.controller.inheritance.FlowInheritability;
|
||||||
import org.apache.nifi.controller.inheritance.FlowInheritabilityCheck;
|
import org.apache.nifi.controller.inheritance.FlowInheritabilityCheck;
|
||||||
import org.apache.nifi.controller.inheritance.MissingComponentsCheck;
|
import org.apache.nifi.controller.inheritance.MissingComponentsCheck;
|
||||||
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
|
|
||||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||||
import org.apache.nifi.encrypt.PropertyEncryptor;
|
import org.apache.nifi.encrypt.PropertyEncryptor;
|
||||||
import org.apache.nifi.flow.Bundle;
|
import org.apache.nifi.flow.Bundle;
|
||||||
|
@ -77,6 +76,7 @@ import org.apache.nifi.parameter.Parameter;
|
||||||
import org.apache.nifi.parameter.ParameterContext;
|
import org.apache.nifi.parameter.ParameterContext;
|
||||||
import org.apache.nifi.parameter.ParameterContextManager;
|
import org.apache.nifi.parameter.ParameterContextManager;
|
||||||
import org.apache.nifi.parameter.ParameterDescriptor;
|
import org.apache.nifi.parameter.ParameterDescriptor;
|
||||||
|
import org.apache.nifi.parameter.ParameterGroup;
|
||||||
import org.apache.nifi.parameter.ParameterProviderConfiguration;
|
import org.apache.nifi.parameter.ParameterProviderConfiguration;
|
||||||
import org.apache.nifi.parameter.StandardParameterProviderConfiguration;
|
import org.apache.nifi.parameter.StandardParameterProviderConfiguration;
|
||||||
import org.apache.nifi.persistence.FlowConfigurationArchiveManager;
|
import org.apache.nifi.persistence.FlowConfigurationArchiveManager;
|
||||||
|
@ -115,10 +115,10 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
|
@ -381,24 +381,6 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private BundleCoordinate getCompatibleBundle(final BundleCoordinate coordinate, final ExtensionManager extensionManager, final String type) {
|
|
||||||
final org.apache.nifi.bundle.Bundle exactBundle = extensionManager.getBundle(coordinate);
|
|
||||||
if (exactBundle != null) {
|
|
||||||
return coordinate;
|
|
||||||
}
|
|
||||||
|
|
||||||
final BundleDTO bundleDto = new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion());
|
|
||||||
final Optional<BundleCoordinate> optionalCoordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, type, bundleDto);
|
|
||||||
if (optionalCoordinate.isPresent()) {
|
|
||||||
final BundleCoordinate selectedCoordinate = optionalCoordinate.get();
|
|
||||||
logger.debug("Found compatible bundle {} for {} and type {}", selectedCoordinate.getCoordinate(), coordinate, type);
|
|
||||||
return selectedCoordinate;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug("Could not find a compatible bundle for {} and type {}", coordinate, type);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void synchronizeFlow(final FlowController controller, final DataFlow existingFlow, final DataFlow proposedFlow, final AffectedComponentSet affectedComponentSet) {
|
private void synchronizeFlow(final FlowController controller, final DataFlow existingFlow, final DataFlow proposedFlow, final AffectedComponentSet affectedComponentSet) {
|
||||||
// attempt to sync controller with proposed flow
|
// attempt to sync controller with proposed flow
|
||||||
try {
|
try {
|
||||||
|
@ -594,7 +576,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
flowRegistryClient.setProperties(decryptedProperties, false, sensitiveDynamicPropertyNames);
|
flowRegistryClient.setProperties(decryptedProperties, false, sensitiveDynamicPropertyNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void inheritReportingTasks(final FlowController controller, final VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) throws ReportingTaskInstantiationException {
|
private void inheritReportingTasks(final FlowController controller, final VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
|
||||||
final Set<String> versionedTaskIds = new HashSet<>();
|
final Set<String> versionedTaskIds = new HashSet<>();
|
||||||
for (final VersionedReportingTask versionedReportingTask : dataflow.getReportingTasks()) {
|
for (final VersionedReportingTask versionedReportingTask : dataflow.getReportingTasks()) {
|
||||||
versionedTaskIds.add(versionedReportingTask.getInstanceIdentifier());
|
versionedTaskIds.add(versionedReportingTask.getInstanceIdentifier());
|
||||||
|
@ -613,7 +595,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addReportingTask(final FlowController controller, final VersionedReportingTask reportingTask) throws ReportingTaskInstantiationException {
|
private void addReportingTask(final FlowController controller, final VersionedReportingTask reportingTask) {
|
||||||
final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, reportingTask.getBundle(), reportingTask.getType());
|
final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, reportingTask.getBundle(), reportingTask.getType());
|
||||||
|
|
||||||
final ReportingTaskNode taskNode = controller.createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
|
final ReportingTaskNode taskNode = controller.createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
|
||||||
|
@ -823,7 +805,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
final Map<String, VersionedParameterContext> namedParameterContexts,
|
final Map<String, VersionedParameterContext> namedParameterContexts,
|
||||||
final PropertyEncryptor encryptor
|
final PropertyEncryptor encryptor
|
||||||
) {
|
) {
|
||||||
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext, encryptor);
|
final Map<String, Parameter> parameters = createParameterMap(flowManager, versionedParameterContext, encryptor);
|
||||||
|
|
||||||
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
|
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
|
||||||
final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
|
final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
|
||||||
|
@ -868,8 +850,13 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
return referenceIds;
|
return referenceIds;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Parameter> createParameterMap(final VersionedParameterContext versionedParameterContext,
|
private Map<String, Parameter> createParameterMap(
|
||||||
final PropertyEncryptor encryptor) {
|
final FlowManager flowManager,
|
||||||
|
final VersionedParameterContext versionedParameterContext,
|
||||||
|
final PropertyEncryptor encryptor
|
||||||
|
) {
|
||||||
|
final Map<String, Parameter> providedParameters = getProvidedParameters(flowManager, versionedParameterContext);
|
||||||
|
|
||||||
final Map<String, Parameter> parameters = new HashMap<>();
|
final Map<String, Parameter> parameters = new HashMap<>();
|
||||||
for (final VersionedParameter versioned : versionedParameterContext.getParameters()) {
|
for (final VersionedParameter versioned : versionedParameterContext.getParameters()) {
|
||||||
final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
|
final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
|
||||||
|
@ -882,6 +869,15 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
final String rawValue = versioned.getValue();
|
final String rawValue = versioned.getValue();
|
||||||
if (rawValue == null) {
|
if (rawValue == null) {
|
||||||
parameterValue = null;
|
parameterValue = null;
|
||||||
|
} else if (versioned.isProvided()) {
|
||||||
|
final String name = versioned.getName();
|
||||||
|
final Parameter providedParameter = providedParameters.get(name);
|
||||||
|
if (providedParameter == null) {
|
||||||
|
logger.warn("Parameter Context [{}] Provided Parameter [{}] not found", versionedParameterContext.getIdentifier(), name);
|
||||||
|
parameterValue = null;
|
||||||
|
} else {
|
||||||
|
parameterValue = providedParameter.getValue();
|
||||||
|
}
|
||||||
} else if (versioned.isSensitive()) {
|
} else if (versioned.isSensitive()) {
|
||||||
parameterValue = decrypt(rawValue, encryptor);
|
parameterValue = decrypt(rawValue, encryptor);
|
||||||
} else {
|
} else {
|
||||||
|
@ -902,17 +898,11 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
final Map<String, VersionedParameterContext> namedParameterContexts,
|
final Map<String, VersionedParameterContext> namedParameterContexts,
|
||||||
final PropertyEncryptor encryptor
|
final PropertyEncryptor encryptor
|
||||||
) {
|
) {
|
||||||
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext, encryptor);
|
final Map<String, Parameter> parameters = createParameterMap(flowManager, versionedParameterContext, encryptor);
|
||||||
|
|
||||||
final Map<String, String> currentValues = new HashMap<>();
|
final Map<String, String> currentValues = new HashMap<>();
|
||||||
parameterContext.getParameters().values().forEach(param -> currentValues.put(param.getDescriptor().getName(), param.getValue()));
|
parameterContext.getParameters().values().forEach(param -> currentValues.put(param.getDescriptor().getName(), param.getValue()));
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
final Map<String, String> proposedValues = parameters.entrySet().stream()
|
|
||||||
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getValue()));
|
|
||||||
logger.debug("For Parameter Context {}, current parameters = {}, proposed = {}", parameterContext.getName(), currentValues, proposedValues);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<String, Parameter> updatedParameters = new HashMap<>();
|
final Map<String, Parameter> updatedParameters = new HashMap<>();
|
||||||
final Set<String> proposedParameterNames = new HashSet<>();
|
final Set<String> proposedParameterNames = new HashSet<>();
|
||||||
for (final VersionedParameter parameter : versionedParameterContext.getParameters()) {
|
for (final VersionedParameter parameter : versionedParameterContext.getParameters()) {
|
||||||
|
@ -1010,6 +1000,38 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
|
||||||
removeMissingServices(controller, dataflow);
|
removeMissingServices(controller, dataflow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, Parameter> getProvidedParameters(final FlowManager flowManager, final VersionedParameterContext versionedParameterContext) {
|
||||||
|
final Map<String, Parameter> providedProviders;
|
||||||
|
final String parameterProviderId = versionedParameterContext.getParameterProvider();
|
||||||
|
if (parameterProviderId == null) {
|
||||||
|
providedProviders = Collections.emptyMap();
|
||||||
|
} else {
|
||||||
|
final ParameterProviderNode parameterProviderNode = flowManager.getParameterProvider(parameterProviderId);
|
||||||
|
providedProviders = getProvidedParameters(parameterProviderNode, versionedParameterContext.getParameterGroupName());
|
||||||
|
}
|
||||||
|
return providedProviders;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Parameter> getProvidedParameters(final ParameterProviderNode parameterProviderNode, final String parameterGroupName) {
|
||||||
|
logger.debug("Fetching Parameters for Group [{}] from Provider [{}]", parameterGroupName, parameterProviderNode.getIdentifier());
|
||||||
|
parameterProviderNode.fetchParameters();
|
||||||
|
|
||||||
|
final Map<String, Parameter> parameters;
|
||||||
|
final Optional<ParameterGroup> foundParameterGroup = parameterProviderNode.findFetchedParameterGroup(parameterGroupName);
|
||||||
|
if (foundParameterGroup.isPresent()) {
|
||||||
|
final ParameterGroup parameterGroup = foundParameterGroup.get();
|
||||||
|
parameters = parameterGroup.getParameters().stream()
|
||||||
|
.collect(
|
||||||
|
Collectors.toMap(parameter -> parameter.getDescriptor().getName(), Function.identity())
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
parameters = Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Fetched Parameters [{}] for Group [{}] from Provider [{}]", parameters.size(), parameterGroupName, parameterProviderNode.getIdentifier());
|
||||||
|
return parameters;
|
||||||
|
}
|
||||||
|
|
||||||
private void removeMissingServices(final FlowController controller, final VersionedDataflow dataflow) {
|
private void removeMissingServices(final FlowController controller, final VersionedDataflow dataflow) {
|
||||||
if (dataflow == null) {
|
if (dataflow == null) {
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue