NIFI-10012 Added XML and JSON FlowEncryptor implementations

This closes #6054

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2022-05-12 17:59:44 -07:00 committed by exceptionfactory
parent e618e85da7
commit 24adc73862
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
7 changed files with 350 additions and 60 deletions

View File

@ -34,5 +34,15 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-xml-processing</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flow.encryptor;
import org.apache.nifi.encrypt.PropertyEncryptor;
import java.util.regex.Pattern;
public abstract class AbstractFlowEncryptor implements FlowEncryptor {
protected static final Pattern ENCRYPTED_PATTERN = Pattern.compile("^enc\\{([^}]+?)}$");
protected static final int FIRST_GROUP = 1;
protected static final String ENCRYPTED_FORMAT = "enc{%s}";
protected String getOutputEncrypted(final String inputEncrypted, final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) {
final String inputDecrypted = inputEncryptor.decrypt(inputEncrypted);
final String outputEncrypted = outputEncryptor.encrypt(inputDecrypted);
return String.format(ENCRYPTED_FORMAT, outputEncrypted);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flow.encryptor;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.encrypt.PropertyEncryptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.regex.Matcher;
public class JsonFlowEncryptor extends AbstractFlowEncryptor {
@Override
public void processFlow(final InputStream inputStream, final OutputStream outputStream,
final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) {
final JsonFactory factory = new JsonFactory();
try (final JsonGenerator generator = factory.createGenerator(outputStream)){
try (final JsonParser parser = factory.createParser(inputStream)) {
parser.setCodec(new ObjectMapper());
processJsonByTokens(parser, generator, inputEncryptor, outputEncryptor);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed Processing Flow Configuration", e);
}
}
private void processJsonByTokens(final JsonParser parser, final JsonGenerator generator,
final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) throws IOException {
JsonToken token = parser.nextToken();
while (token != null) {
switch (token) {
case NOT_AVAILABLE:
break;
case START_OBJECT:
generator.writeStartObject();
break;
case END_OBJECT:
generator.writeEndObject();
break;
case START_ARRAY:
generator.writeStartArray();
break;
case END_ARRAY:
generator.writeEndArray();
break;
case FIELD_NAME:
generator.writeFieldName(parser.getValueAsString());
break;
case VALUE_EMBEDDED_OBJECT:
generator.writeEmbeddedObject(parser.getEmbeddedObject());
break;
case VALUE_STRING:
final String value = parser.getValueAsString();
final Matcher matcher = ENCRYPTED_PATTERN.matcher(value);
if (matcher.matches()) {
generator.writeString(getOutputEncrypted(matcher.group(FIRST_GROUP), inputEncryptor, outputEncryptor));
} else {
generator.writeString(value);
}
break;
case VALUE_NUMBER_INT:
generator.writeNumber(parser.getIntValue());
break;
case VALUE_NUMBER_FLOAT:
generator.writeNumber(parser.getFloatValue());
break;
case VALUE_TRUE:
generator.writeBoolean(true);
break;
case VALUE_FALSE:
generator.writeBoolean(false);
break;
case VALUE_NULL:
generator.writeNull();
break;
default:
throw new IllegalStateException(String.format("Token unrecognized [%s]", token));
}
token = parser.nextToken();
}
}
}

View File

@ -18,26 +18,17 @@ package org.apache.nifi.flow.encryptor;
import org.apache.nifi.encrypt.PropertyEncryptor;
import java.io.BufferedReader;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UncheckedIOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Standard Flow Encryptor handles reading Input Steam and writing Output Stream
*/
public class StandardFlowEncryptor implements FlowEncryptor {
private static final Pattern ENCRYPTED_PATTERN = Pattern.compile("enc\\{([^\\}]+?)\\}");
private static final int FIRST_GROUP = 1;
private static final String ENCRYPTED_FORMAT = "enc{%s}";
private static final int XML_DECLARATION = '<';
/**
* Process Flow Configuration Stream replacing existing encrypted properties with new encrypted properties
@ -48,39 +39,17 @@ public class StandardFlowEncryptor implements FlowEncryptor {
* @param outputEncryptor Property Encryptor for Output Configuration
*/
@Override
public void processFlow(final InputStream inputStream, final OutputStream outputStream, final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) {
try (final PrintWriter writer = new PrintWriter(new OutputStreamWriter(outputStream))) {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
reader.lines().forEach(line -> {
final Matcher matcher = ENCRYPTED_PATTERN.matcher(line);
final StringBuffer sb = new StringBuffer();
boolean matched = false;
while (matcher.find()) {
final String outputEncrypted = getOutputEncrypted(matcher.group(FIRST_GROUP), inputEncryptor, outputEncryptor);
matcher.appendReplacement(sb, outputEncrypted);
matched = true;
}
final String outputLine;
if (matched) {
matcher.appendTail(sb);
outputLine = sb.toString();
} else {
outputLine = line;
}
writer.println(outputLine);
});
}
public void processFlow(final InputStream inputStream, final OutputStream outputStream,
final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) {
final BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
bufferedInputStream.mark(1);
try {
final int firstByte = bufferedInputStream.read();
bufferedInputStream.reset();
final FlowEncryptor flowEncryptor = (firstByte == XML_DECLARATION) ? new XmlFlowEncryptor() : new JsonFlowEncryptor();
flowEncryptor.processFlow(bufferedInputStream, outputStream, inputEncryptor, outputEncryptor);
} catch (final IOException e) {
throw new UncheckedIOException("Failed Processing Flow Configuration", e);
throw new UncheckedIOException(e);
}
}
private String getOutputEncrypted(final String inputEncrypted, final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) {
final String inputDecrypted = inputEncryptor.decrypt(inputEncrypted);
final String outputEncrypted = outputEncryptor.encrypt(inputDecrypted);
return String.format(ENCRYPTED_FORMAT, outputEncrypted);
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flow.encryptor;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.xml.processing.stream.StandardXMLEventReaderProvider;
import org.apache.nifi.xml.processing.stream.XMLEventReaderProvider;
import javax.xml.stream.XMLEventFactory;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.Characters;
import javax.xml.stream.events.XMLEvent;
import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
public class XmlFlowEncryptor extends AbstractFlowEncryptor {
private static final XMLEventReaderProvider eventReaderProvider = new StandardXMLEventReaderProvider();
@Override
public void processFlow(final InputStream inputStream, final OutputStream outputStream,
final PropertyEncryptor inputEncryptor, final PropertyEncryptor outputEncryptor) {
final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newInstance();
final XMLEventFactory eventFactory = XMLEventFactory.newInstance();
try {
final XMLEventReader reader = eventReaderProvider.getEventReader(new StreamSource(inputStream));
final XMLEventWriter writer = xmlOutputFactory.createXMLEventWriter(outputStream, StandardCharsets.UTF_8.name());
while (reader.hasNext()) {
final XMLEvent event = reader.nextEvent();
if (event.getEventType() == XMLEvent.CHARACTERS) {
final Characters characters = event.asCharacters();
final String value = characters.getData();
final Matcher matcher = ENCRYPTED_PATTERN.matcher(value);
if (matcher.matches()) {
final String processedValue = getOutputEncrypted(matcher.group(FIRST_GROUP), inputEncryptor, outputEncryptor);
writer.add(eventFactory.createCharacters(processedValue));
} else {
writer.add(characters);
}
} else if (event.getEventType() == XMLEvent.START_DOCUMENT) {
writer.add(event);
writer.add(eventFactory.createSpace(System.lineSeparator()));
} else {
writer.add(event);
}
}
writer.flush();
writer.close();
reader.close();
outputStream.close();
inputStream.close();
} catch (final XMLStreamException e) {
throw new RuntimeException("Flow XML Processing Failed", e);
} catch (final IOException e) {
throw new UncheckedIOException("Failed Processing Flow Configuration", e);
}
}
}

View File

@ -24,24 +24,27 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class StandardFlowEncryptorTest {
private static final String INPUT_KEY = UUID.randomUUID().toString();
private static final String OUTPUT_KEY = UUID.randomUUID().toString();
private static final String ENCRYPTED_FORMAT = "enc{%s}";
private static final Pattern OUTPUT_PATTERN = Pattern.compile("^enc\\{([^}]+?)}$");
private static final String PATTERN_REGEX = "enc\\{([^}]+?)}";
private static final Pattern PATTERN = Pattern.compile(PATTERN_REGEX);
private PropertyEncryptor inputEncryptor;
@ -49,6 +52,7 @@ public class StandardFlowEncryptorTest {
private StandardFlowEncryptor flowEncryptor;
@BeforeEach
public void setEncryptors() {
inputEncryptor = getPropertyEncryptor(INPUT_KEY, EncryptionMethod.MD5_256AES.getAlgorithm());
@ -60,16 +64,17 @@ public class StandardFlowEncryptorTest {
public void testProcessEncrypted() {
final String property = StandardFlowEncryptorTest.class.getSimpleName();
final String encryptedProperty = String.format(ENCRYPTED_FORMAT, inputEncryptor.encrypt(property));
final String encryptedRow = String.format("%s%n", encryptedProperty);
final String encryptedRow = String.format("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>%n" +
"<test>%s</test>", encryptedProperty);
final InputStream inputStream = new ByteArrayInputStream(encryptedRow.getBytes(StandardCharsets.UTF_8));
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
flowEncryptor.processFlow(inputStream, outputStream, inputEncryptor, outputEncryptor);
final String outputEncrypted = new String(outputStream.toByteArray());
final Matcher matcher = OUTPUT_PATTERN.matcher(outputEncrypted);
assertTrue(String.format("Encrypted Pattern not found [%s]", outputEncrypted), matcher.find());
final String outputEncrypted = outputStream.toString();
final Matcher matcher = PATTERN.matcher(outputEncrypted);
assertTrue(matcher.find(), String.format("Encrypted Pattern not found [%s]", outputEncrypted));
final String outputEncryptedProperty = matcher.group(1);
final String outputDecrypted = outputEncryptor.decrypt(outputEncryptedProperty);
@ -78,18 +83,105 @@ public class StandardFlowEncryptorTest {
@Test
public void testProcessNoEncrypted() {
final String property = String.format("%s%n", StandardFlowEncryptorTest.class.getSimpleName());
final String property = String.format("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>%n" +
"<test>%s</test>", StandardFlowEncryptorTest.class.getSimpleName());
final InputStream inputStream = new ByteArrayInputStream(property.getBytes(StandardCharsets.UTF_8));
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
flowEncryptor.processFlow(inputStream, outputStream, inputEncryptor, outputEncryptor);
final String outputProperty = new String(outputStream.toByteArray());
assertEquals(property, outputProperty);
final String outputProperty = outputStream.toString();
assertEquals(removeXmlDeclaration(property).trim(), removeXmlDeclaration(outputProperty).trim());
}
@Test
public void testProcessJson() throws IOException {
final String password = StandardFlowEncryptorTest.class.getSimpleName();
final String encryptedPassword = String.format(ENCRYPTED_FORMAT, inputEncryptor.encrypt(password));
final String sampleFlowJson = getSampleFlowJson(encryptedPassword);
try (final InputStream inputStream = new ByteArrayInputStream(sampleFlowJson.getBytes(StandardCharsets.UTF_8))) {
try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
flowEncryptor.processFlow(inputStream, outputStream, inputEncryptor, outputEncryptor);
final String outputFlowJson = outputStream.toString();
compareFlow(sampleFlowJson.trim(), outputFlowJson.trim());
}
}
}
@Test
public void testProcessXml() throws IOException {
final String password = StandardFlowEncryptorTest.class.getSimpleName();
final String encryptedPassword = String.format(ENCRYPTED_FORMAT, inputEncryptor.encrypt(password));
final String sampleFlowXml = getSampleFlowXml(encryptedPassword);
try (final InputStream inputStream = new ByteArrayInputStream(sampleFlowXml.getBytes(StandardCharsets.UTF_8))) {
try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
flowEncryptor.processFlow(inputStream, outputStream, inputEncryptor, outputEncryptor);
final String outputXml = outputStream.toString();
compareFlow(removeXmlDeclaration(sampleFlowXml).trim(), removeXmlDeclaration(outputXml).trim());
}
}
}
private PropertyEncryptor getPropertyEncryptor(final String propertiesKey, final String propertiesAlgorithm) {
return new PropertyEncryptorBuilder(propertiesKey).setAlgorithm(propertiesAlgorithm).build();
}
private void compareFlow(final String sampleFlow, final String outputFlow) {
final Matcher inputMatcher = PATTERN.matcher(sampleFlow);
final Matcher outputMatcher = PATTERN.matcher(outputFlow);
assertTrue(inputMatcher.find() && outputMatcher.find());
assertEquals(inputEncryptor.decrypt(inputMatcher.group(1)), outputEncryptor.decrypt(outputMatcher.group(1)));
assertEquals(sampleFlow.replaceAll(PATTERN_REGEX, ""), outputFlow.replaceAll(PATTERN_REGEX, ""));
}
private String getSampleFlowJson(final String password) {
Objects.requireNonNull(password);
return String.format("{\"properties\":{\"username\":\"sample_username\",\"password\":\"%s\"}}", password);
}
private String getSampleFlowXml(final String password) {
Objects.requireNonNull(password);
final String flowXml = String.format("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>%n" +
"<processor>%n" +
"\t<property>%n" +
"\t\t<name>Username</name>%n" +
"\t\t<value>SAMPLE_USERNAME</value>%n" +
"\t</property>%n" +
"\t<property>%n" +
"\t\t<name>Password</name>%n" +
"\t\t<value>%s</value>%n" +
"\t</property>%n" +
"</processor>", password);
return getProcessedFlowXml(flowXml);
}
private String getProcessedFlowXml(final String flowXml) {
final PropertyEncryptor encryptor = new PropertyEncryptor() {
@Override
public String encrypt(String property) {
return property;
}
@Override
public String decrypt(String encryptedProperty) {
return encryptedProperty;
}
};
final InputStream inputStream = new ByteArrayInputStream(flowXml.getBytes(StandardCharsets.UTF_8));
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
flowEncryptor.processFlow(inputStream, outputStream, encryptor, encryptor);
return outputStream.toString();
}
private String removeXmlDeclaration(final String xmlFlow) {
return xmlFlow.replaceAll("<\\?xml.+\\?>", "");
}
}

View File

@ -4057,8 +4057,8 @@ class ConfigEncryptionToolTest extends GroovyLogTestCase {
assert migratedCipherTexts.size() == cipherTextCount
// Ensure that everything else is identical
assert flowXmlFile.text.replaceAll(WFXCTR, "") ==
workingFile.text.replaceAll(WFXCTR, "")
assertEquals(removeXmlDeclarationAndComments(flowXmlFile.text).replaceAll(WFXCTR, "").trim(),
removeXmlDeclarationAndComments(workingFile.text).replaceAll(WFXCTR, "").trim())
}
@ -4109,8 +4109,8 @@ class ConfigEncryptionToolTest extends GroovyLogTestCase {
assert newCipherTexts.size() == ORIGINAL_CIPHER_TEXT_COUNT
// Ensure that everything else is identical
assert new File(workingFile.path).text.replaceAll(WFXCTR, "") ==
flowXmlFile.text.replaceAll(WFXCTR, "")
assertEquals(removeXmlDeclarationAndComments(new File(workingFile.path).text).replaceAll(WFXCTR, "").trim(),
removeXmlDeclarationAndComments(flowXmlFile.text).replaceAll(WFXCTR, "").trim())
// Update the "source" XML content for the next iteration
currentXmlContent = tool.loadFlowXml(workingFile.path)
@ -4234,7 +4234,7 @@ class ConfigEncryptionToolTest extends GroovyLogTestCase {
logger.info("Loaded flow.xml.gz: \n${readXmlContent}")
// Assert
assert readXmlContent == xmlContent
assert readXmlContent.trim() == xmlContent.trim()
}
@Test
@ -4700,5 +4700,7 @@ class ConfigEncryptionToolTest extends GroovyLogTestCase {
fieldsFound
}
// TODO: Test with 128/256-bit available
private String removeXmlDeclarationAndComments(final String xmlFlow) {
return xmlFlow.replaceAll("<\\?xml.+\\?>", "").replaceAll("(?s)<!--.*?-->", "")
}
}