NIFI-12372 [MiNiFi] Encrpyt raw flow sensitive properties

Signed-off-by: Ferenc Erdei <erdei.ferenc90@gmail.com>
This closes #8028.
This commit is contained in:
Ferenc Kis 2023-11-15 14:40:03 +01:00 committed by Ferenc Erdei
parent 2d82cdc0f5
commit 9d50c6dd53
No known key found for this signature in database
GPG Key ID: 023D856C60E92F96
15 changed files with 794 additions and 123 deletions

View File

@ -46,7 +46,8 @@ import org.apache.nifi.minifi.bootstrap.service.ReloadService;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.properties.ApplicationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,7 +118,7 @@ public class RunMiNiFi implements ConfigurationFileHolder {
periodicStatusReporterManager =
new PeriodicStatusReporterManager(bootstrapProperties, miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
MiNiFiConfigurationChangeListener configurationChangeListener = new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider,
new FlowEnrichService(new ApplicationProperties(bootstrapProperties)));
new StandardFlowEnrichService(new ApplicationProperties(bootstrapProperties)), StandardFlowSerDeService.defaultInstance());
configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider, this, singleton(configurationChangeListener));
currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);

View File

@ -35,11 +35,13 @@ import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.slf4j.Logger;
public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener {
@ -50,12 +52,15 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
private final Logger logger;
private final BootstrapFileProvider bootstrapFileProvider;
private final FlowEnrichService flowEnrichService;
private final FlowSerDeService flowSerDeService;
public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider, FlowEnrichService flowEnrichService) {
public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider,
FlowEnrichService flowEnrichService, FlowSerDeService flowSerDeService) {
this.runner = runner;
this.logger = logger;
this.bootstrapFileProvider = bootstrapFileProvider;
this.flowEnrichService = flowEnrichService;
this.flowSerDeService = flowSerDeService;
}
@Override
@ -83,8 +88,10 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
byte[] rawFlow = toByteArray(flowConfigInputStream);
byte[] enrichedFlow = flowEnrichService.enrichFlow(rawFlow);
persist(enrichedFlow, currentFlowConfigFile, true);
VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow);
VersionedDataflow enrichedFlow = flowEnrichService.enrichFlow(rawDataFlow);
byte[] serializedEnrichedFlow = flowSerDeService.serialize(enrichedFlow);
persist(serializedEnrichedFlow, currentFlowConfigFile, true);
restartInstance();
persist(rawFlow, currentRawFlowConfigFile, false);
setActiveFlowReference(wrap(rawFlow));

View File

@ -27,4 +27,12 @@
<artifactId>minifi-commons-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.service;
import org.apache.nifi.controller.flow.VersionedDataflow;
/**
* Defines interface methods used to implement a FlowEnrichService.
* The purpose of a flow enrich service is to enrich a VersionedDataFlow with various additional components specific to the MiNiFi instance
*/
public interface FlowEnrichService {
/**
* Responsible for enriching a VersionedDataflow instance
*
* @param versionedDataflow a VersionedDataflow instance
* @return VersionedDataflow the enriched flow instance
*/
VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow);
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.service;
import org.apache.nifi.controller.flow.VersionedDataflow;
/**
* Defines interface methods used to implement a FlowPropertyEncryptor.
* The purpose of a flow property encryptor is to encrypt sensitive properties in the flow using a particular strategy.
*/
public interface FlowPropertyEncryptor {
/**
* Responsible for encrypting sensitive properties in a VersionedDataflow instance
*
* @param flow a VersionedDataflow instance to encrypt its sensitive properties
* @return VersionedDataflow the flow instance with encrypted sensitive properties
*/
VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow);
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.service;
import org.apache.nifi.controller.flow.VersionedDataflow;
/**
* Defines interface methods used to implement a FlowSerDeService.
* The purpose of a flow serde service is to provide serialisation / deserialization for VersionedDataflow
*/
public interface FlowSerDeService {
/**
* Responsible for serialising a VersionedDataflow instance
*
* @param flow a VersionedDataflow instance to be serialised
* @return byte[] VersionedDataflow in serialised format
*/
byte[] serialize(VersionedDataflow flow);
/**
* Responsible for deserializing a VersionedDataflow instance
*
* @param flow a VersionedDataflow instance in serialised (byte array) format
* @return VersionedDataflow a deserialized instance
*/
VersionedDataflow deserialize(byte[] flow);
}

View File

@ -47,6 +47,16 @@ limitations under the License.
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-encryptor</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

View File

@ -19,7 +19,6 @@ package org.apache.nifi.minifi.commons.service;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.parseBoolean;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Map.entry;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
@ -28,21 +27,12 @@ import static java.util.stream.Collectors.toMap;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.flow.ScheduledState.ENABLED;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.ControllerServiceAPI;
@ -57,7 +47,7 @@ import org.apache.nifi.properties.ReadableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlowEnrichService {
public class StandardFlowEnrichService implements FlowEnrichService {
static final String DEFAULT_SSL_CONTEXT_SERVICE_NAME = "SSL Context Service";
@ -66,7 +56,7 @@ public class FlowEnrichService {
static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME = "Site-To-Site-Provenance-Reporting";
static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_ID = "generated-s2s-provenance-reporting-task";
private static final Logger LOG = LoggerFactory.getLogger(FlowEnrichService.class);
private static final Logger LOG = LoggerFactory.getLogger(StandardFlowEnrichService.class);
private static final String NIFI_BUNDLE_GROUP = "org.apache.nifi";
private static final String STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE = "org.apache.nifi.ssl.StandardRestrictedSSLContextService";
@ -81,16 +71,12 @@ public class FlowEnrichService {
private final ReadableProperties minifiProperties;
public FlowEnrichService(ReadableProperties minifiProperties) {
public StandardFlowEnrichService(ReadableProperties minifiProperties) {
this.minifiProperties = minifiProperties;
}
public byte[] enrichFlow(byte[] flowCandidate) {
if (LOG.isDebugEnabled()) {
LOG.debug("Enriching flow with content: \n{}", new String(flowCandidate, UTF_8));
}
VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate);
@Override
public VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow) {
versionedDataflow.setReportingTasks(ofNullable(versionedDataflow.getReportingTasks()).orElseGet(ArrayList::new));
versionedDataflow.setRegistries(ofNullable(versionedDataflow.getRegistries()).orElseGet(ArrayList::new));
versionedDataflow.setControllerServices(ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new));
@ -119,27 +105,8 @@ public class FlowEnrichService {
createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY))
.ifPresent(versionedDataflow.getReportingTasks()::add);
byte[] enrichedFlow = toByteArray(versionedDataflow);
if (LOG.isDebugEnabled()) {
LOG.debug("Enriched flow with content: \n{}", new String(enrichedFlow, UTF_8));
}
return enrichedFlow;
}
private VersionedDataflow parseVersionedDataflow(byte[] flow) {
try {
ObjectMapper objectMapper = deserializationObjectMapper();
return objectMapper.readValue(flow, VersionedDataflow.class);
} catch (final Exception e) {
throw new FlowSerializationException("Could not parse flow as a VersionedDataflow", e);
}
}
private ObjectMapper deserializationObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory()));
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return objectMapper;
return versionedDataflow;
}
private Optional<VersionedControllerService> createCommonSslControllerService() {
@ -260,27 +227,4 @@ public class FlowEnrichService {
.filter(entry -> StringUtils.isNotBlank(entry.getValue()))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private byte[] toByteArray(VersionedDataflow versionedDataflow) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
JsonFactory factory = new JsonFactory();
JsonGenerator generator = factory.createGenerator(byteArrayOutputStream);
generator.setCodec(serializationObjectMapper());
generator.writeObject(versionedDataflow);
generator.flush();
byteArrayOutputStream.flush();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Unable to convert flow to byte array", e);
}
}
private ObjectMapper serializationObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL));
objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory()));
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return objectMapper;
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.service;
import static java.util.Optional.ofNullable;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.concat;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.nifi.c2.protocol.component.api.DefinedType;
import org.apache.nifi.c2.protocol.component.api.PropertyDescriptor;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
public class StandardFlowPropertyEncryptor implements FlowPropertyEncryptor {
private static final String ENCRYPTED_FORMAT = "enc{%s}";
private final PropertyEncryptor propertyEncryptor;
private final RuntimeManifest runTimeManifest;
public StandardFlowPropertyEncryptor(PropertyEncryptor propertyEncryptor, RuntimeManifest runTimeManifest) {
this.propertyEncryptor = propertyEncryptor;
this.runTimeManifest = runTimeManifest;
}
@Override
public VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow) {
encryptParameterContextsProperties(flow);
Map<String, Set<String>> sensitivePropertiesByComponentType = Optional.of(flowProvidedSensitiveProperties(flow))
.filter(not(Map::isEmpty))
.orElseGet(this::runtimeManifestSensitiveProperties);
encryptFlowComponentsProperties(flow, sensitivePropertiesByComponentType);
return flow;
}
private void encryptParameterContextsProperties(VersionedDataflow flow) {
ofNullable(flow.getParameterContexts())
.orElse(List.of())
.forEach(parameterContext -> ofNullable(parameterContext.getParameters()).orElse(Set.of())
.stream()
.filter(VersionedParameter::isSensitive)
.filter(not(parameter -> ofNullable(parameter.getValue()).orElse(EMPTY).startsWith(FlowSerializer.ENC_PREFIX)))
.forEach(parameter -> parameter.setValue(encrypt(parameter.getValue()))));
}
private Map<String, Set<String>> flowProvidedSensitiveProperties(VersionedDataflow flow) {
return fetchFlowComponents(flow)
.map(extension -> Map.entry(
extension.getType(),
ofNullable(extension.getPropertyDescriptors()).orElse(Map.of())
.values()
.stream()
.filter(VersionedPropertyDescriptor::isSensitive)
.map(VersionedPropertyDescriptor::getName)
.collect(toSet())
))
.filter(not(entry -> entry.getValue().isEmpty()))
.collect(toMap(Entry::getKey, Entry::getValue, this::mergeSets));
}
private Map<String, Set<String>> runtimeManifestSensitiveProperties() {
return ofNullable(runTimeManifest.getBundles()).orElse(List.of())
.stream()
.flatMap(bundle -> Stream.of(
ofNullable(bundle.getComponentManifest().getProcessors()).orElse(List.of()),
ofNullable(bundle.getComponentManifest().getControllerServices()).orElse(List.of())
))
.flatMap(List::stream)
.collect(toMap(
DefinedType::getType,
type -> ofNullable(type.getPropertyDescriptors()).orElse(Map.of())
.values()
.stream()
.filter(PropertyDescriptor::getSensitive)
.map(PropertyDescriptor::getName)
.collect(toSet()),
this::mergeSets
));
}
private void encryptFlowComponentsProperties(VersionedDataflow flow, Map<String, Set<String>> sensitivePropertiesByComponentType) {
fetchFlowComponents(flow)
.forEach(extension -> {
Set<String> sensitivePropertyNames = sensitivePropertiesByComponentType.getOrDefault(extension.getType(), Set.of());
Map<String, String> encryptedProperties = ofNullable(extension.getProperties()).orElse(Map.of())
.entrySet()
.stream()
.collect(toMap(Entry::getKey, encryptPropertyIfNeeded(sensitivePropertyNames)));
extension.setProperties(encryptedProperties);
});
}
private Stream<? extends VersionedConfigurableExtension> fetchFlowComponents(VersionedDataflow flow) {
return concat(
ofNullable(flow.getControllerServices()).orElse(List.of()).stream(),
fetchComponentsRecursively(flow.getRootGroup())
);
}
private Stream<? extends VersionedConfigurableExtension> fetchComponentsRecursively(VersionedProcessGroup processGroup) {
return concat(
Stream.of(
ofNullable(processGroup.getProcessors()).orElse(Set.of()),
ofNullable(processGroup.getControllerServices()).orElse(Set.of()))
.flatMap(Set::stream),
ofNullable(processGroup.getProcessGroups()).orElse(Set.of()).stream()
.flatMap(this::fetchComponentsRecursively)
);
}
private Set<String> mergeSets(Set<String> first, Set<String> second) {
first.addAll(second);
return first;
}
private Function<Entry<String, String>, String> encryptPropertyIfNeeded(Set<String> sensitivePropertyNames) {
return entry ->
sensitivePropertyNames.contains(entry.getKey()) && !entry.getValue().startsWith(FlowSerializer.ENC_PREFIX)
? encrypt(entry.getValue())
: entry.getValue();
}
private String encrypt(String parameter) {
return String.format(ENCRYPTED_FORMAT, propertyEncryptor.encrypt(parameter));
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.service;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.serialization.FlowSerializationException;
public class StandardFlowSerDeService implements FlowSerDeService {
private final ObjectMapper objectMapper;
StandardFlowSerDeService(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public static StandardFlowSerDeService defaultInstance() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL));
objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory()));
objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
return new StandardFlowSerDeService(objectMapper);
}
@Override
public byte[] serialize(VersionedDataflow flow) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
JsonFactory factory = new JsonFactory();
JsonGenerator generator = factory.createGenerator(byteArrayOutputStream);
generator.setCodec(objectMapper);
generator.writeObject(flow);
generator.flush();
byteArrayOutputStream.flush();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new FlowSerializationException("Unable to serialize flow", e);
}
}
@Override
public VersionedDataflow deserialize(byte[] flow) {
try {
return objectMapper.readValue(flow, VersionedDataflow.class);
} catch (Exception e) {
throw new FlowSerializationException("Unable to deserialize flow", e);
}
}
}

View File

@ -18,9 +18,10 @@
package org.apache.nifi.minifi.commons.service;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.nifi.minifi.commons.service.FlowEnrichService.COMMON_SSL_CONTEXT_SERVICE_NAME;
import static org.apache.nifi.minifi.commons.service.FlowEnrichService.DEFAULT_SSL_CONTEXT_SERVICE_NAME;
import static org.apache.nifi.minifi.commons.service.FlowEnrichService.SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME;
import static java.util.UUID.randomUUID;
import static org.apache.nifi.minifi.commons.service.StandardFlowEnrichService.COMMON_SSL_CONTEXT_SERVICE_NAME;
import static org.apache.nifi.minifi.commons.service.StandardFlowEnrichService.DEFAULT_SSL_CONTEXT_SERVICE_NAME;
import static org.apache.nifi.minifi.commons.service.StandardFlowEnrichService.SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -33,6 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -46,76 +48,64 @@ import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
import org.apache.nifi.properties.StandardReadableProperties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
public class FlowEnrichServiceTest {
public class StandardFlowEnrichServiceTest {
private static final Path DEFAULT_FLOW_JSON = Path.of("src/test/resources/default_flow.json");
@Test
public void testFlowIsLeftIntactIfEnrichingIsNotNecessary() {
// given
Map<String, String> properties = Map.of();
byte[] testFlowBytes = flowToString(loadDefaultFlow()).getBytes(UTF_8);
VersionedDataflow testFlow = loadDefaultFlow();
// when
FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties));
byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes);
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
// then
byte[] testFlowBytes = flowToString(testFlow).getBytes(UTF_8);
byte[] enrichedFlowBytes = flowToString(enrichedFlow).getBytes(UTF_8);
assertArrayEquals(testFlowBytes, enrichedFlowBytes);
}
@Test
public void testMissingRootGroupIdsAreFilledIn() {
// given
Map<String, String> properties = Map.of();
VersionedDataflow testFlow = loadDefaultFlow();
testFlow.getRootGroup().setIdentifier(null);
testFlow.getRootGroup().setInstanceIdentifier(null);
byte[] testFlowBytes = flowToString(testFlow).getBytes(UTF_8);
UUID expectedIdentifier = UUID.randomUUID();
UUID expectedIdentifier = randomUUID();
try (MockedStatic<UUID> uuid = mockStatic(UUID.class)) {
uuid.when(UUID::randomUUID).thenReturn(expectedIdentifier);
// when
FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties));
byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes);
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
// then
VersionedDataflow versionedDataflow = flowFromString(new String(enrichedFlowBytes, UTF_8));
Assertions.assertEquals(expectedIdentifier.toString(), versionedDataflow.getRootGroup().getIdentifier());
Assertions.assertEquals(expectedIdentifier.toString(), versionedDataflow.getRootGroup().getInstanceIdentifier());
assertEquals(expectedIdentifier.toString(), enrichedFlow.getRootGroup().getIdentifier());
assertEquals(expectedIdentifier.toString(), enrichedFlow.getRootGroup().getInstanceIdentifier());
}
}
@Test
public void testCommonSslControllerServiceIsAddedWithBundleVersionAndProcessorControllerServiceIsOverridden() {
// given
Map<String, String> properties = securityProperties(true);
VersionedDataflow versionedDataflow = loadDefaultFlow();
VersionedDataflow testFlow = loadDefaultFlow();
Bundle bundle = bundle("org.apache.nifi", "nifi-ssl-context-service-nar", StringUtils.EMPTY);
String originalSslControllerServiceId = "original_ssl_controller_service_id";
versionedDataflow.getRootGroup()
testFlow.getRootGroup()
.setProcessors(Set.of(
processor(bundle, "processor_1", originalSslControllerServiceId),
processor(bundle, "processor_2", originalSslControllerServiceId)
));
byte[] testFlowBytes = flowToString(versionedDataflow).getBytes(UTF_8);
// when
FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties));
byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes);
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
// then
VersionedDataflow enrichedFlow = flowFromString(new String(enrichedFlowBytes, UTF_8));
Assertions.assertEquals(1, enrichedFlow.getControllerServices().size());
assertEquals(1, enrichedFlow.getControllerServices().size());
VersionedControllerService sslControllerService = enrichedFlow.getControllerServices().get(0);
assertEquals(COMMON_SSL_CONTEXT_SERVICE_NAME, sslControllerService.getName());
Assertions.assertEquals(StringUtils.EMPTY, sslControllerService.getBundle().getVersion());
assertEquals(StringUtils.EMPTY, sslControllerService.getBundle().getVersion());
Set<VersionedProcessor> processors = enrichedFlow.getRootGroup().getProcessors();
assertEquals(2, processors.size());
assertTrue(
@ -128,7 +118,6 @@ public class FlowEnrichServiceTest {
@Test
public void testProvenanceReportingTaskIsAdded() {
// given
Map<String, String> properties = Map.of(
MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT.getKey(), "comment",
MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY.getKey(), "timer_driven",
@ -140,21 +129,18 @@ public class FlowEnrichServiceTest {
MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE.getKey(), "1000",
MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT.getKey(), "30 sec"
);
byte[] testFlowBytes = flowToString(loadDefaultFlow()).getBytes(UTF_8);
VersionedDataflow testFlow = loadDefaultFlow();
// when
FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties));
byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes);
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
// then
VersionedDataflow enrichedFlow = flowFromString(new String(enrichedFlowBytes, UTF_8));
List<VersionedReportingTask> reportingTasks = enrichedFlow.getReportingTasks();
assertEquals(1, reportingTasks.size());
VersionedReportingTask provenanceReportingTask = reportingTasks.get(0);
assertEquals(SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME, provenanceReportingTask.getName());
Assertions.assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT.getKey()), provenanceReportingTask.getComments());
Assertions.assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY.getKey()), provenanceReportingTask.getSchedulingStrategy());
Assertions.assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD.getKey()), provenanceReportingTask.getSchedulingPeriod());
assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT.getKey()), provenanceReportingTask.getComments());
assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY.getKey()), provenanceReportingTask.getSchedulingStrategy());
assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD.getKey()), provenanceReportingTask.getSchedulingPeriod());
Map<String, String> provenanceReportingTaskProperties = provenanceReportingTask.getProperties();
assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME.getKey()), provenanceReportingTaskProperties.get("Input Port Name"));
assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL.getKey()), provenanceReportingTaskProperties.get("Destination URL"));
@ -224,10 +210,10 @@ public class FlowEnrichServiceTest {
private VersionedProcessor processor(Bundle bundle, String name, String originalSslControllerServiceId) {
VersionedProcessor versionedProcessor = new VersionedProcessor();
versionedProcessor.setIdentifier(UUID.randomUUID().toString());
versionedProcessor.setIdentifier(randomUUID().toString());
versionedProcessor.setName(name);
versionedProcessor.setBundle(bundle);
versionedProcessor.setProperties(Map.of(DEFAULT_SSL_CONTEXT_SERVICE_NAME, originalSslControllerServiceId));
versionedProcessor.setProperties(new HashMap<>(Map.of(DEFAULT_SSL_CONTEXT_SERVICE_NAME, originalSslControllerServiceId)));
return versionedProcessor;
}
}

View File

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.service;
import static java.util.Map.entry;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toMap;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.nifi.c2.protocol.component.api.Bundle;
import org.apache.nifi.c2.protocol.component.api.ComponentManifest;
import org.apache.nifi.c2.protocol.component.api.ControllerServiceDefinition;
import org.apache.nifi.c2.protocol.component.api.ProcessorDefinition;
import org.apache.nifi.c2.protocol.component.api.PropertyDescriptor;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class StandardFlowPropertyEncryptorTest {
private static final String PROCESSOR_TYPE_1 = "processor_type_1";
private static final String PROCESSOR_TYPE_2 = "processor_type_2";
private static final String PROCESSOR_TYPE_3 = "processor_type_3";
private static final String CONTROLLER_SERVICE_TYPE_1 = "controller_service_type_1";
private static final String CONTROLLER_SERVICE_TYPE_2 = "controller_service_type_2";
private static final String CONTROLLER_SERVICE_TYPE_3 = "controller_service_type_3";
private static final String SENSITIVE_PROPERTY_NAME_PREFIX = "sensitive";
private static final String NON_SENSITIVE_1 = "non-sensitive-1";
private static final String SENSITIVE_1 = SENSITIVE_PROPERTY_NAME_PREFIX + "-1";
private static final String NON_SENSITIVE_2 = "non-sensitive-2";
private static final String SENSITIVE_3 = SENSITIVE_PROPERTY_NAME_PREFIX + "-3";
private static final Map<String, String> PARAMETERS1 = Map.of(
NON_SENSITIVE_1, NON_SENSITIVE_1,
SENSITIVE_1, SENSITIVE_1
);
private static final Map<String, String> PARAMETERS2 = Map.of(
NON_SENSITIVE_2, NON_SENSITIVE_2
);
private static final Map<String, String> PARAMETERS3 = Map.of(
SENSITIVE_3, SENSITIVE_3
);
private static final Map<String, VersionedPropertyDescriptor> DESCRIPTORS1 = Map.of(
NON_SENSITIVE_1, versionedPropertyDescriptor(NON_SENSITIVE_1, false),
SENSITIVE_1, versionedPropertyDescriptor(SENSITIVE_1, true)
);
private static final Map<String, VersionedPropertyDescriptor> DESCRIPTORS2 = Map.of(
NON_SENSITIVE_2, versionedPropertyDescriptor(NON_SENSITIVE_2, false)
);
private static final Map<String, VersionedPropertyDescriptor> DESCRIPTORS3 = Map.of(
SENSITIVE_3, versionedPropertyDescriptor(SENSITIVE_3, true)
);
@Mock
private PropertyEncryptor mockPropertyEncryptor;
@Mock
private RuntimeManifest mockRunTimeManifest;
private FlowPropertyEncryptor testEncryptor;
private static VersionedPropertyDescriptor versionedPropertyDescriptor(String name, boolean isSensitive) {
VersionedPropertyDescriptor versionedPropertyDescriptor = new VersionedPropertyDescriptor();
versionedPropertyDescriptor.setName(name);
versionedPropertyDescriptor.setSensitive(isSensitive);
return versionedPropertyDescriptor;
}
@BeforeEach
public void setup() {
when(mockPropertyEncryptor.encrypt(anyString())).thenReturn(randomAlphabetic(5));
testEncryptor = new StandardFlowPropertyEncryptor(mockPropertyEncryptor, mockRunTimeManifest);
}
@Test
public void shouldEncryptParameterContextsSensitiveVariables() {
VersionedDataflow testFlow = flowWithParameterContexts();
VersionedDataflow encryptedFlow = testEncryptor.encryptSensitiveProperties(testFlow);
encryptedFlow.getParameterContexts().stream()
.flatMap(context -> context.getParameters().stream())
.forEach(parameter -> {
if (parameter.isSensitive()) {
assertTrue(parameter.getValue().startsWith(FlowSerializer.ENC_PREFIX));
} else {
assertFalse(parameter.getValue().startsWith(FlowSerializer.ENC_PREFIX));
}
});
}
@Test
public void shouldEncryptPropertiesUsingDescriptorsFromFlow() {
VersionedDataflow testFlow = flowWithPropertyDescriptors();
VersionedDataflow encryptedFlow = testEncryptor.encryptSensitiveProperties(testFlow);
verify(mockRunTimeManifest, never()).getBundles();
assertSensitiveFlowComponentPropertiesAreEncoded(encryptedFlow);
}
@Test
public void shouldEncryptPropertiesUsingDescriptorsFromRuntimeManifest() {
VersionedDataflow testFlow = flowWithoutPropertyDescriptors();
when(mockRunTimeManifest.getBundles()).thenReturn(runTimeManifestBundles());
VersionedDataflow encryptedFlow = testEncryptor.encryptSensitiveProperties(testFlow);
assertSensitiveFlowComponentPropertiesAreEncoded(encryptedFlow);
}
private VersionedDataflow flowWithParameterContexts() {
VersionedDataflow versionedDataflow = new VersionedDataflow();
versionedDataflow.setRootGroup(new VersionedProcessGroup());
versionedDataflow.setParameterContexts(
List.of(
parameterContext(
parameter(NON_SENSITIVE_1, NON_SENSITIVE_1, false),
parameter(SENSITIVE_1, SENSITIVE_1, true)
),
parameterContext(
parameter(NON_SENSITIVE_2, NON_SENSITIVE_2, false)
),
parameterContext(
parameter(SENSITIVE_3, SENSITIVE_3, true)
)
)
);
return versionedDataflow;
}
private VersionedParameterContext parameterContext(VersionedParameter... parameters) {
VersionedParameterContext versionedParameterContext = new VersionedParameterContext();
versionedParameterContext.setParameters(Set.of(parameters));
return versionedParameterContext;
}
private VersionedParameter parameter(String name, String value, boolean sensitive) {
VersionedParameter versionedParameter = new VersionedParameter();
versionedParameter.setName(name);
versionedParameter.setValue(value);
versionedParameter.setSensitive(sensitive);
return versionedParameter;
}
private VersionedDataflow flowWithPropertyDescriptors() {
VersionedDataflow versionedDataflow = new VersionedDataflow();
VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup();
versionedProcessGroup.setProcessors(
Set.of(
versionedProcessor(PROCESSOR_TYPE_1, PARAMETERS1, DESCRIPTORS1),
versionedProcessor(PROCESSOR_TYPE_2, PARAMETERS2, DESCRIPTORS2),
versionedProcessor(PROCESSOR_TYPE_3, PARAMETERS3, DESCRIPTORS3)
));
versionedProcessGroup.setControllerServices(
Set.of(
versionedControllerService(CONTROLLER_SERVICE_TYPE_1, PARAMETERS1, DESCRIPTORS1),
versionedControllerService(CONTROLLER_SERVICE_TYPE_2, PARAMETERS2, DESCRIPTORS2),
versionedControllerService(CONTROLLER_SERVICE_TYPE_3, PARAMETERS3, DESCRIPTORS3)
)
);
versionedDataflow.setRootGroup(versionedProcessGroup);
return versionedDataflow;
}
private VersionedDataflow flowWithoutPropertyDescriptors() {
VersionedDataflow versionedDataflow = new VersionedDataflow();
VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup();
versionedProcessGroup.setProcessors(
Set.of(
versionedProcessor(PROCESSOR_TYPE_1, PARAMETERS1, Map.of()),
versionedProcessor(PROCESSOR_TYPE_2, PARAMETERS2, Map.of()),
versionedProcessor(PROCESSOR_TYPE_3, PARAMETERS3, Map.of())
));
versionedProcessGroup.setControllerServices(
Set.of(
versionedControllerService(CONTROLLER_SERVICE_TYPE_1, PARAMETERS1, Map.of()),
versionedControllerService(CONTROLLER_SERVICE_TYPE_2, PARAMETERS2, Map.of()),
versionedControllerService(CONTROLLER_SERVICE_TYPE_3, PARAMETERS3, Map.of())
)
);
versionedDataflow.setRootGroup(versionedProcessGroup);
return versionedDataflow;
}
private VersionedProcessor versionedProcessor(String processorType, Map<String, String> properties, Map<String, VersionedPropertyDescriptor> propertyDescriptors) {
VersionedProcessor versionedProcessor = new VersionedProcessor();
versionedProcessor.setIdentifier(randomUUID().toString());
versionedProcessor.setType(processorType);
versionedProcessor.setProperties(properties);
versionedProcessor.setPropertyDescriptors(propertyDescriptors);
return versionedProcessor;
}
private VersionedControllerService versionedControllerService(String controllerServiceType, Map<String, String> properties,
Map<String, VersionedPropertyDescriptor> propertyDescriptors) {
VersionedControllerService versionedControllerService = new VersionedControllerService();
versionedControllerService.setIdentifier(randomUUID().toString());
versionedControllerService.setType(controllerServiceType);
versionedControllerService.setProperties(properties);
versionedControllerService.setPropertyDescriptors(propertyDescriptors);
return versionedControllerService;
}
private List<Bundle> runTimeManifestBundles() {
return List.of(
bundle(
List.of(processorDefinition(PROCESSOR_TYPE_1, DESCRIPTORS1), processorDefinition(PROCESSOR_TYPE_2, DESCRIPTORS2)),
List.of(controllerServiceDefinition(CONTROLLER_SERVICE_TYPE_1, DESCRIPTORS1))
),
bundle(
List.of(processorDefinition(PROCESSOR_TYPE_3, DESCRIPTORS3)),
List.of(controllerServiceDefinition(CONTROLLER_SERVICE_TYPE_2, DESCRIPTORS2), controllerServiceDefinition(CONTROLLER_SERVICE_TYPE_3, DESCRIPTORS3))
)
);
}
private Bundle bundle(List<ProcessorDefinition> processorDefinition, List<ControllerServiceDefinition> controllerServiceDefinition) {
Bundle bundle = new Bundle();
ComponentManifest componentManifest = new ComponentManifest();
componentManifest.setProcessors(processorDefinition);
componentManifest.setControllerServices(controllerServiceDefinition);
bundle.setComponentManifest(componentManifest);
return bundle;
}
private ProcessorDefinition processorDefinition(String processorType, Map<String, VersionedPropertyDescriptor> propertyDescriptors) {
ProcessorDefinition processorDefinition = new ProcessorDefinition();
processorDefinition.setType(processorType);
processorDefinition.setPropertyDescriptors(
convertVersionedPropertyDescriptorMapToPropertyDescriptorMap(propertyDescriptors)
);
return processorDefinition;
}
private ControllerServiceDefinition controllerServiceDefinition(String controllerServiceType, Map<String, VersionedPropertyDescriptor> propertyDescriptors) {
ControllerServiceDefinition controllerServiceDefinition = new ControllerServiceDefinition();
controllerServiceDefinition.setType(controllerServiceType);
controllerServiceDefinition.setPropertyDescriptors(
convertVersionedPropertyDescriptorMapToPropertyDescriptorMap(propertyDescriptors)
);
return controllerServiceDefinition;
}
private LinkedHashMap<String, PropertyDescriptor> convertVersionedPropertyDescriptorMapToPropertyDescriptorMap(Map<String, VersionedPropertyDescriptor> propertyDescriptors) {
return propertyDescriptors.values()
.stream()
.map(propertyDescriptor -> entry(propertyDescriptor.getName(), convertPropertyDescriptor(propertyDescriptor)))
.collect(toMap(Entry::getKey, Entry::getValue, (l, r) -> l, LinkedHashMap::new));
}
private PropertyDescriptor convertPropertyDescriptor(VersionedPropertyDescriptor versionedPropertyDescriptor) {
PropertyDescriptor propertyDescriptor = new PropertyDescriptor();
propertyDescriptor.setName(versionedPropertyDescriptor.getName());
propertyDescriptor.setSensitive(versionedPropertyDescriptor.isSensitive());
return propertyDescriptor;
}
private void assertSensitiveFlowComponentPropertiesAreEncoded(VersionedDataflow encryptedFlow) {
Stream.of(
encryptedFlow.getRootGroup().getProcessors(),
encryptedFlow.getRootGroup().getControllerServices()
)
.flatMap(Set::stream)
.map(VersionedConfigurableExtension::getProperties)
.flatMap(properties -> properties.entrySet().stream())
.forEach(propertyEntry -> {
if (propertyEntry.getKey().startsWith(SENSITIVE_PROPERTY_NAME_PREFIX)) {
assertTrue(propertyEntry.getValue().startsWith(FlowSerializer.ENC_PREFIX));
} else {
assertFalse(propertyEntry.getValue().startsWith(FlowSerializer.ENC_PREFIX));
}
}
);
}
}

View File

@ -46,6 +46,8 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TR
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE;
import static org.apache.nifi.util.NiFiProperties.FLOW_CONFIGURATION_FILE;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
@ -87,6 +89,7 @@ import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.encrypt.PropertyEncryptorBuilder;
import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser;
import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParser;
import org.apache.nifi.manifest.RuntimeManifestService;
@ -97,7 +100,10 @@ import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.nar.ExtensionManagerHolder;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.FormatUtils;
@ -214,9 +220,13 @@ public class C2NifiClientService {
UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
PropertiesPersister propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation);
FlowEnrichService flowEnrichService = new FlowEnrichService(niFiProperties);
UpdateConfigurationStrategy updateConfigurationStrategy =
new DefaultUpdateConfigurationStrategy(flowController, flowService, flowEnrichService, niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
FlowPropertyEncryptor flowPropertyEncryptor = new StandardFlowPropertyEncryptor(
new PropertyEncryptorBuilder(niFiProperties.getProperty(SENSITIVE_PROPS_KEY))
.setAlgorithm(niFiProperties.getProperty(SENSITIVE_PROPS_ALGORITHM)).build(),
runtimeManifestService.getManifest());
UpdateConfigurationStrategy updateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(flowController, flowService,
new StandardFlowEnrichService(niFiProperties), flowPropertyEncryptor,
StandardFlowSerDeService.defaultInstance(), niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
return new C2OperationHandlerProvider(List.of(
new UpdateConfigurationOperationHandler(client, flowIdHolder, updateConfigurationStrategy, emptyOperandPropertiesProvider),

View File

@ -47,9 +47,12 @@ import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.services.FlowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,15 +69,20 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
private final FlowController flowController;
private final FlowService flowService;
private final FlowEnrichService flowEnrichService;
private final FlowPropertyEncryptor flowPropertyEncryptor;
private final FlowSerDeService flowSerDeService;
private final Path flowConfigurationFile;
private final Path backupFlowConfigurationFile;
private final Path rawFlowConfigurationFile;
private final Path backupRawFlowConfigurationFile;
public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService,
FlowPropertyEncryptor flowPropertyEncryptor, FlowSerDeService flowSerDeService, String flowConfigurationFile) {
this.flowController = flowController;
this.flowService = flowService;
this.flowEnrichService = flowEnrichService;
this.flowPropertyEncryptor = flowPropertyEncryptor;
this.flowSerDeService = flowSerDeService;
Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
this.flowConfigurationFile = flowConfigurationFilePath;
this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
@ -90,12 +98,21 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
}
try {
byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow);
VersionedDataflow propertyEncryptedRawDataFlow = flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
byte[] serializedPropertyEncryptedRawDataFlow = flowSerDeService.serialize(propertyEncryptedRawDataFlow);
VersionedDataflow enrichedFlowCandidate = flowEnrichService.enrichFlow(propertyEncryptedRawDataFlow);
byte[] serializedEnrichedFlowCandidate = flowSerDeService.serialize(enrichedFlowCandidate);
backup(flowConfigurationFile, backupFlowConfigurationFile);
backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
persist(enrichedFlowCandidate, flowConfigurationFile, true);
persist(rawFlow, rawFlowConfigurationFile, false);
persist(serializedPropertyEncryptedRawDataFlow, rawFlowConfigurationFile, false);
persist(serializedEnrichedFlowCandidate, flowConfigurationFile, true);
reloadFlow();
return true;
} catch (IllegalStateException e) {
LOGGER.error("Configuration update failed. Reverting and reloading previous flow", e);

View File

@ -47,8 +47,11 @@ import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.services.FlowService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -64,8 +67,13 @@ public class DefaultUpdateConfigurationStrategyTest {
private static final byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
private static final byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
private static final byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
private static final VersionedDataflow NEW_RAW_FLOW_CONFIG = new VersionedDataflow();
private static final byte[] NEW_ENCRYPTED_FLOW_CONFIG_CONTENT = "original_encrypted_content".getBytes(UTF_8);
private static final VersionedDataflow NEW_ENCRYPTED_FLOW_CONFIG = new VersionedDataflow();
private static final byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
private static final VersionedDataflow NEW_ENRICHED_FLOW_CONFIG = new VersionedDataflow();
@TempDir
private File tempDir;
@ -77,6 +85,10 @@ public class DefaultUpdateConfigurationStrategyTest {
@Mock
private FlowEnrichService mockFlowEnrichService;
@Mock
private FlowPropertyEncryptor mockFlowPropertyEncryptor;
@Mock
private FlowSerDeService mockFlowSerDeService;
@Mock
private FlowManager mockFlowManager;
@Mock
private ProcessGroup mockProcessGroup;
@ -96,7 +108,8 @@ public class DefaultUpdateConfigurationStrategyTest {
rawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
backupRawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
testUpdateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService, flowConfigurationFile.toString());
testUpdateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService,
mockFlowPropertyEncryptor, mockFlowSerDeService, flowConfigurationFile.toString());
writeGzipFile(flowConfigurationFile, ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT);
writePlainTextFile(rawFlowConfigurationFile, ORIGINAL_RAW_FLOW_CONFIG_CONTENT);
@ -105,7 +118,11 @@ public class DefaultUpdateConfigurationStrategyTest {
@Test
public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException {
// given
when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_RAW_FLOW_CONFIG);
when(mockFlowPropertyEncryptor.encryptSensitiveProperties(NEW_RAW_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG);
when(mockFlowSerDeService.serialize(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT);
when(mockFlowEnrichService.enrichFlow(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG);
when(mockFlowSerDeService.serialize(NEW_ENRICHED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup);
@ -117,7 +134,7 @@ public class DefaultUpdateConfigurationStrategyTest {
assertTrue(exists(flowConfigurationFile));
assertTrue(exists(rawFlowConfigurationFile));
assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
assertArrayEquals(NEW_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
assertArrayEquals(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
assertFalse(exists(backupFlowConfigurationFile));
assertFalse(exists(backupRawFlowConfigurationFile));
verify(mockFlowService, times(1)).load(null);
@ -128,7 +145,11 @@ public class DefaultUpdateConfigurationStrategyTest {
@Test
public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp() throws IOException {
// given
when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_RAW_FLOW_CONFIG);
when(mockFlowPropertyEncryptor.encryptSensitiveProperties(NEW_RAW_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG);
when(mockFlowSerDeService.serialize(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT);
when(mockFlowEnrichService.enrichFlow(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG);
when(mockFlowSerDeService.serialize(NEW_ENRICHED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup);
doThrow(new IOException()).when(mockFlowService).load(null);