NIFI-10261: Ensure that when comparing Sensitive Parameters during flow sync we decrypt values for the comparison

This closes #6231

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-07-21 16:31:25 -04:00 committed by exceptionfactory
parent 0497907829
commit 92750c2746
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 171 additions and 19 deletions

View File

@ -76,7 +76,7 @@ public class ServiceStateTransition {
} }
state = ControllerServiceState.ENABLED; state = ControllerServiceState.ENABLED;
logger.debug("{} transitioned to ENABLED", controllerServiceNode); logger.debug("{} is now fully ENABLED", controllerServiceNode);
enabledFutures.forEach(future -> future.complete(null)); enabledFutures.forEach(future -> future.complete(null));
} finally { } finally {
@ -124,7 +124,7 @@ public class ServiceStateTransition {
writeLock.lock(); writeLock.lock();
try { try {
state = ControllerServiceState.DISABLED; state = ControllerServiceState.DISABLED;
logger.debug("{} transitioned to DISABLED", controllerServiceNode); logger.info("{} is now fully DISABLED", controllerServiceNode);
stateChangeCondition.signalAll(); stateChangeCondition.signalAll();
disabledFutures.forEach(future -> future.complete(null)); disabledFutures.forEach(future -> future.complete(null));

View File

@ -304,8 +304,16 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override @Override
public void verifyModifiable() throws IllegalStateException { public void verifyModifiable() throws IllegalStateException {
if (getState() != ControllerServiceState.DISABLED) { final ControllerServiceState state = getState();
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
if (state == ControllerServiceState.DISABLING) {
// Provide precise/accurate error message for DISABLING case
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently still disabling. " +
"Please wait for the service to fully disable before attempting to modify it.");
}
if (state != ControllerServiceState.DISABLED) {
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently not disabled - it has a state of " + state
+ ". Please disable the Controller Service first.");
} }
} }
@ -654,6 +662,11 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
} }
final CompletableFuture<Void> future = new CompletableFuture<>(); final CompletableFuture<Void> future = new CompletableFuture<>();
final boolean transitioned = this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLING, future);
if (transitioned) {
return future;
}
if (this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLED, future)) { if (this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLED, future)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry()); final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry());
scheduler.execute(new Runnable() { scheduler.execute(new Runnable() {
@ -672,8 +685,6 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
} }
} }
}); });
} else {
this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLING, future);
} }
return future; return future;

View File

@ -179,6 +179,15 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
// Stop the active components, and then wait for all components to be stopped. // Stop the active components, and then wait for all components to be stopped.
logger.info("In order to inherit proposed dataflow, will stop any components that will be affected by the update"); logger.info("In order to inherit proposed dataflow, will stop any components that will be affected by the update");
if (logger.isDebugEnabled()) {
logger.debug("Will stop the following components:");
logger.debug(activeSet.toString());
final String differencesToString = flowDifferences.stream()
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
logger.debug("This Active Set was determined from the following Flow Differences:\n{}", differencesToString);
}
activeSet.stop(); activeSet.stop();
try { try {

View File

@ -23,6 +23,8 @@ import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedFlowCoordinates; import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel; import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel; import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedProcessor;
@ -30,8 +32,8 @@ import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.VersionedRemoteGroupPort; import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.flow.VersionedRemoteProcessGroup; import org.apache.nifi.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.flow.VersionedParameter; import org.slf4j.Logger;
import org.apache.nifi.flow.VersionedParameterContext; import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -44,6 +46,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class StandardFlowComparator implements FlowComparator { public class StandardFlowComparator implements FlowComparator {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowComparator.class);
private static final String ENCRYPTED_VALUE_PREFIX = "enc{"; private static final String ENCRYPTED_VALUE_PREFIX = "enc{";
private static final String ENCRYPTED_VALUE_SUFFIX = "}"; private static final String ENCRYPTED_VALUE_SUFFIX = "}";
@ -86,7 +90,6 @@ public class StandardFlowComparator implements FlowComparator {
} }
private Set<FlowDifference> compare(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB) { private Set<FlowDifference> compare(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB) {
final Set<FlowDifference> differences = new HashSet<>(); final Set<FlowDifference> differences = new HashSet<>();
// Note that we do not compare the names, because when we import a Flow into NiFi, we may well give it a new name. // Note that we do not compare the names, because when we import a Flow into NiFi, we may well give it a new name.
@ -95,14 +98,6 @@ public class StandardFlowComparator implements FlowComparator {
return differences; return differences;
} }
private boolean allHaveInstanceId(Set<? extends VersionedComponent> components) {
if (components == null) {
return false;
}
return components.stream().allMatch(component -> component.getInstanceIdentifier() != null);
}
private <T extends VersionedComponent> Set<FlowDifference> compareComponents(final Set<T> componentsA, final Set<T> componentsB, final ComponentComparator<T> comparator) { private <T extends VersionedComponent> Set<FlowDifference> compareComponents(final Set<T> componentsA, final Set<T> componentsB, final ComponentComparator<T> comparator) {
final Map<String, T> componentMapA = byId(componentsA == null ? Collections.emptySet() : componentsA); final Map<String, T> componentMapA = byId(componentsA == null ? Collections.emptySet() : componentsA);
final Map<String, T> componentMapB = byId(componentsB == null ? Collections.emptySet() : componentsB); final Map<String, T> componentMapB = byId(componentsB == null ? Collections.emptySet() : componentsB);
@ -198,7 +193,7 @@ public class StandardFlowComparator implements FlowComparator {
compareProperties(taskA, taskB, taskA.getProperties(), taskB.getProperties(), taskA.getPropertyDescriptors(), taskB.getPropertyDescriptors(), differences); compareProperties(taskA, taskB, taskA.getProperties(), taskB.getProperties(), taskA.getPropertyDescriptors(), taskB.getPropertyDescriptors(), differences);
} }
private void compare(final VersionedParameterContext contextA, final VersionedParameterContext contextB, final Set<FlowDifference> differences) { void compare(final VersionedParameterContext contextA, final VersionedParameterContext contextB, final Set<FlowDifference> differences) {
if (compareComponents(contextA, contextB, differences)) { if (compareComponents(contextA, contextB, differences)) {
return; return;
} }
@ -218,7 +213,9 @@ public class StandardFlowComparator implements FlowComparator {
continue; continue;
} }
if (!Objects.equals(parameterA.getValue(), parameterB.getValue())) { final String decryptedValueA = decryptValue(parameterA);
final String decryptedValueB = decryptValue(parameterB);
if (!Objects.equals(decryptedValueA, decryptedValueB)) {
final String valueA = parameterA.isSensitive() ? "<Sensitive Value A>" : parameterA.getValue(); final String valueA = parameterA.isSensitive() ? "<Sensitive Value A>" : parameterA.getValue();
final String valueB = parameterB.isSensitive() ? "<Sensitive Value B>" : parameterB.getValue(); final String valueB = parameterB.isSensitive() ? "<Sensitive Value B>" : parameterB.getValue();
final String description = differenceDescriptor.describeDifference(DifferenceType.PARAMETER_VALUE_CHANGED, flowA.getName(), flowB.getName(), contextA, contextB, name, valueA, valueB); final String description = differenceDescriptor.describeDifference(DifferenceType.PARAMETER_VALUE_CHANGED, flowA.getName(), flowB.getName(), contextA, contextB, name, valueA, valueB);
@ -282,6 +279,21 @@ public class StandardFlowComparator implements FlowComparator {
return propertyDecryptor.apply(value.substring(ENCRYPTED_VALUE_PREFIX.length(), value.length() - ENCRYPTED_VALUE_SUFFIX.length())); return propertyDecryptor.apply(value.substring(ENCRYPTED_VALUE_PREFIX.length(), value.length() - ENCRYPTED_VALUE_SUFFIX.length()));
} }
private String decryptValue(final VersionedParameter parameter) {
final String rawValue = parameter.getValue();
if (rawValue == null) {
return null;
}
final boolean sensitive = parameter.isSensitive() && rawValue.startsWith(ENCRYPTED_VALUE_PREFIX) && rawValue.endsWith(ENCRYPTED_VALUE_SUFFIX);
if (!sensitive) {
logger.debug("Will not decrypt value for parameter {} because it is not encrypted", parameter.getName());
return rawValue;
}
return propertyDecryptor.apply(rawValue.substring(ENCRYPTED_VALUE_PREFIX.length(), rawValue.length() - ENCRYPTED_VALUE_SUFFIX.length()));
}
private void compareProperties(final VersionedComponent componentA, final VersionedComponent componentB, private void compareProperties(final VersionedComponent componentA, final VersionedComponent componentB,
final Map<String, String> propertiesA, final Map<String, String> propertiesB, final Map<String, String> propertiesA, final Map<String, String> propertiesB,
final Map<String, VersionedPropertyDescriptor> descriptorsA, final Map<String, VersionedPropertyDescriptor> descriptorsB, final Map<String, VersionedPropertyDescriptor> descriptorsA, final Map<String, VersionedPropertyDescriptor> descriptorsB,

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow.diff;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
public class TestStandardFlowComparator {
private Map<String, String> decryptedToEncrypted;
private Map<String, String> encryptedToDecrypted;
private StandardFlowComparator comparator;
@BeforeEach
public void setup() {
decryptedToEncrypted = new HashMap<>();
decryptedToEncrypted.put("XYZ", "Hello");
decryptedToEncrypted.put("xyz", "hola");
encryptedToDecrypted = new HashMap<>();
encryptedToDecrypted.put("Hello", "XYZ");
encryptedToDecrypted.put("hola", "xyz");
final Function<String, String> decryptor = encryptedToDecrypted::get;
final ComparableDataFlow flowA = new StandardComparableDataFlow("Flow A", new VersionedProcessGroup());
final ComparableDataFlow flowB = new StandardComparableDataFlow("Flow B", new VersionedProcessGroup());
comparator = new StandardFlowComparator(flowA, flowB, Collections.emptySet(),
new StaticDifferenceDescriptor(), decryptor, VersionedComponent::getInstanceIdentifier);
}
// Ensure that when we are comparing parameter values that we compare the decrypted values, but we don't include any
// decrypted values in the descriptions of the Flow Difference.
@Test
public void testSensitiveParametersDecryptedBeforeCompare() {
final Set<FlowDifference> differences = new HashSet<>();
final Set<VersionedParameter> parametersA = new HashSet<>();
parametersA.add(createParameter("Param 1", "xyz", false));
parametersA.add(createParameter("Param 2", "XYZ", false));
parametersA.add(createParameter("Param 3", "Hi there", false));
parametersA.add(createParameter("Param 4", "xyz", true));
parametersA.add(createParameter("Param 5", "XYZ", true));
// Now that we've created the parameters, change the mapping of decrypted to encrypted so that we encrypt the values
// differently in each context but have the same decrypted value
decryptedToEncrypted.put("xyz", "bonjour");
encryptedToDecrypted.put("bonjour", "xyz");
final Set<VersionedParameter> parametersB = new HashSet<>();
parametersB.add(createParameter("Param 1", "xyz", false));
parametersB.add(createParameter("Param 2", "XYZ", false));
parametersB.add(createParameter("Param 3", "Hey", false));
parametersB.add(createParameter("Param 4", "xyz", true));
parametersB.add(createParameter("Param 5", "xyz", true));
final VersionedParameterContext contextA = new VersionedParameterContext();
contextA.setIdentifier("id");
contextA.setInstanceIdentifier("instanceId");
contextA.setParameters(parametersA);
final VersionedParameterContext contextB = new VersionedParameterContext();
contextB.setIdentifier("id");
contextB.setInstanceIdentifier("instanceId");
contextB.setParameters(parametersB);
comparator.compare(contextA, contextB, differences);
assertEquals(2, differences.size());
for (final FlowDifference difference : differences) {
assertSame(DifferenceType.PARAMETER_VALUE_CHANGED, difference.getDifferenceType());
// Ensure that the sensitive values are not contained in the description
assertFalse(difference.getDescription().contains("Hello"));
assertFalse(difference.getDescription().contains("Hola"));
assertFalse(difference.getDescription().contains("bonjour"));
}
final long numContainingValue = differences.stream()
.filter(diff -> diff.getDescription().contains("Hey") && diff.getDescription().contains("Hi there"))
.count();
assertEquals(1, numContainingValue);
}
private VersionedParameter createParameter(final String name, final String value, final boolean sensitive) {
final VersionedParameter parameter = new VersionedParameter();
parameter.setName(name);
parameter.setValue(sensitive ? "enc{" + decryptedToEncrypted.get(value) + "}" : value);
parameter.setSensitive(sensitive);
return parameter;
}
}