NIFI-8499 - Added encrypted FlowFile repository swap file implementation

This closes #5122

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2021-06-03 15:37:27 -04:00 committed by exceptionfactory
parent 07ff4f2592
commit a3c1cd074b
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 239 additions and 10 deletions

View File

@ -427,7 +427,7 @@ To enable authentication via SAML the following properties must be configured in
|`nifi.security.user.saml.group.attribute.name`| The name of a SAML assertion attribute containing group names the user belongs to. This property is optional, but if populated the groups will be passed along to the authorization process. |`nifi.security.user.saml.group.attribute.name`| The name of a SAML assertion attribute containing group names the user belongs to. This property is optional, but if populated the groups will be passed along to the authorization process.
|`nifi.security.user.saml.metadata.signing.enabled`| Enables signing of the generated service provider metadata. |`nifi.security.user.saml.metadata.signing.enabled`| Enables signing of the generated service provider metadata.
|`nifi.security.user.saml.request.signing.enabled`| Controls the value of `AuthnRequestsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates that the service provider (i.e. NiFi) should not sign authentication requests sent to the identity provider, but the requests may still need to be signed if the identity provider indicates `WantAuthnRequestSigned=true`. |`nifi.security.user.saml.request.signing.enabled`| Controls the value of `AuthnRequestsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates that the service provider (i.e. NiFi) should not sign authentication requests sent to the identity provider, but the requests may still need to be signed if the identity provider indicates `WantAuthnRequestSigned=true`.
|`nifi.security.user.saml.want.assertions.signed`| Controls the value of `WantAssertionsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indictaes that the identity provider should sign assertions, but some identity providers may provide their own configuration for controlling whether assertions are signed. |`nifi.security.user.saml.want.assertions.signed`| Controls the value of `WantAssertionsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates that the identity provider should sign assertions, but some identity providers may provide their own configuration for controlling whether assertions are signed.
|`nifi.security.user.saml.signature.algorithm`| The algorithm to use when signing SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256 will be used. |`nifi.security.user.saml.signature.algorithm`| The algorithm to use when signing SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256 will be used.
|`nifi.security.user.saml.signature.digest.algorithm`| The digest algorithm to use when signing SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256 will be used. |`nifi.security.user.saml.signature.digest.algorithm`| The digest algorithm to use when signing SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256 will be used.
|`nifi.security.user.saml.message.logging.enabled`| Enables logging of SAML messages for debugging purposes. |`nifi.security.user.saml.message.logging.enabled`| Enables logging of SAML messages for debugging purposes.
@ -2961,7 +2961,9 @@ available again. These properties govern how that process occurs.
|==== |====
|*Property*|*Description* |*Property*|*Description*
|`nifi.swap.manager.implementation`|The Swap Manager implementation. The default value is `org.apache.nifi.controller.FileSystemSwapManager` and should not be changed. |`nifi.swap.manager.implementation`| The Swap Manager implementation. The default value is `org.apache.nifi.controller.FileSystemSwapManager`.
There is an alternate implementation, `EncryptedFileSystemSwapManager`, that encrypts the swap file content on
disk. The encryption key configured for the FlowFile repository is used to perform the encryption, using the AES-GCM algorithm.
|`nifi.queue.swap.threshold`|The queue threshold at which NiFi starts to swap FlowFile information to disk. The default value is `20000`. |`nifi.queue.swap.threshold`|The queue threshold at which NiFi starts to swap FlowFile information to disk. The default value is `20000`.
|==== |====

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.security.kms.CryptoUtils;
import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.security.kms.KeyProvider;
import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Cipher;
import javax.crypto.CipherInputStream;
import javax.crypto.CipherOutputStream;
import javax.crypto.SecretKey;
import javax.crypto.spec.GCMParameterSpec;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
/**
* <p>
* An implementation of {@link FlowFileSwapManager} that swaps FlowFiles
* to/from local disk. The swap file is encrypted using AES/GCM, using the
* encryption key defined in nifi.properties for the FlowFile repository.
* </p>
*/
public class EncryptedFileSystemSwapManager extends FileSystemSwapManager {
private static final String CIPHER_TRANSFORMATION = "AES/GCM/NoPadding";
private static final int SIZE_IV_AES_BYTES = 16;
private static final int SIZE_TAG_GCM_BITS = 128;
private static final Logger logger = LoggerFactory.getLogger(EncryptedFileSystemSwapManager.class);
private static final SecureRandom secureRandom = new SecureRandom();
private final SecretKey secretKey;
public EncryptedFileSystemSwapManager(final NiFiProperties nifiProperties)
throws IOException, EncryptionException, GeneralSecurityException {
super(nifiProperties);
// acquire reference to FlowFileRepository key
final FlowFileRepositoryEncryptionConfiguration configuration = new FlowFileRepositoryEncryptionConfiguration(nifiProperties);
if (!CryptoUtils.isValidRepositoryEncryptionConfiguration(configuration)) {
logger.error("The flowfile repository encryption configuration is not valid (see above). Shutting down...");
throw new EncryptionException("The flowfile repository encryption configuration is not valid");
}
final KeyProvider keyProvider = RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(configuration);
this.secretKey = keyProvider.getKey(configuration.getEncryptionKeyId());
}
protected InputStream getInputStream(final File file) throws IOException {
final FileInputStream fis = new FileInputStream(file);
try {
final byte[] iv = new byte[SIZE_IV_AES_BYTES];
final int ivBytesRead = fis.read(iv);
if (ivBytesRead != SIZE_IV_AES_BYTES) {
throw new IOException(String.format(
"problem reading IV [expected=%d, actual=%d]", SIZE_IV_AES_BYTES, ivBytesRead));
}
final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
cipher.init(Cipher.DECRYPT_MODE, secretKey, new GCMParameterSpec(SIZE_TAG_GCM_BITS, iv));
return new CipherInputStream(fis, cipher);
} catch (GeneralSecurityException e) {
throw new IOException(String.format("Preparing Cipher Failed for File [%s]", file.getAbsolutePath()), e);
}
}
protected OutputStream getOutputStream(final File file) throws IOException {
final byte[] iv = new byte[SIZE_IV_AES_BYTES];
secureRandom.nextBytes(iv);
final FileOutputStream fos = new FileOutputStream(file);
fos.write(iv);
try {
final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
cipher.init(Cipher.ENCRYPT_MODE, secretKey, new GCMParameterSpec(SIZE_TAG_GCM_BITS, iv));
return new CipherOutputStream(fos, cipher);
} catch (GeneralSecurityException e) {
throw new IOException(String.format("Preparing Cipher Failed for File [%s]", file.getAbsolutePath()), e);
}
}
}

View File

@ -119,6 +119,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
this.flowFileRepository = initializationContext.getFlowFileRepository(); this.flowFileRepository = initializationContext.getFlowFileRepository();
} }
protected InputStream getInputStream(final File file) throws IOException {
return new FileInputStream(file);
}
protected OutputStream getOutputStream(final File file) throws IOException {
return new FileOutputStream(file);
}
@Override @Override
public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue, final String partitionName) throws IOException { public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
@ -135,14 +142,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final String swapLocation = swapFile.getAbsolutePath(); final String swapLocation = swapFile.getAbsolutePath();
final SwapSerializer serializer = new SchemaSwapSerializer(); final SwapSerializer serializer = new SchemaSwapSerializer();
try (final FileOutputStream fos = new FileOutputStream(swapTempFile); try (final OutputStream os = getOutputStream(swapTempFile);
final OutputStream out = new BufferedOutputStream(fos)) { final OutputStream out = new BufferedOutputStream(os)) {
out.write(MAGIC_HEADER); out.write(MAGIC_HEADER);
final DataOutputStream dos = new DataOutputStream(out); final DataOutputStream dos = new DataOutputStream(out);
dos.writeUTF(serializer.getSerializationName()); dos.writeUTF(serializer.getSerializationName());
serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out); serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
fos.getFD().sync(); out.flush();
} catch (final IOException ioe) { } catch (final IOException ioe) {
// we failed to write out the entire swap file. Delete the temporary file, if we can. // we failed to write out the entire swap file. Delete the temporary file, if we can.
swapTempFile.delete(); swapTempFile.delete();
@ -188,8 +195,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found"); throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found");
} }
try (final InputStream fis = new FileInputStream(swapFile); try (final InputStream is = getInputStream(swapFile);
final InputStream bis = new BufferedInputStream(fis); final InputStream bis = new BufferedInputStream(is);
final DataInputStream in = new DataInputStream(bis)) { final DataInputStream in = new DataInputStream(bis)) {
final SwapDeserializer deserializer = createSwapDeserializer(in); final SwapDeserializer deserializer = createSwapDeserializer(in);
@ -318,7 +325,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
// Read the queue identifier from the swap file to check if the swap file is for this queue // Read the queue identifier from the swap file to check if the swap file is for this queue
try (final InputStream fis = new FileInputStream(swapFile); try (final InputStream fis = getInputStream(swapFile);
final InputStream bufferedIn = new BufferedInputStream(fis); final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bufferedIn)) { final DataInputStream in = new DataInputStream(bufferedIn)) {
@ -351,7 +358,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final File swapFile = new File(swapLocation); final File swapFile = new File(swapLocation);
// read record from disk via the swap file // read record from disk via the swap file
try (final InputStream fis = new FileInputStream(swapFile); try (final InputStream fis = getInputStream(swapFile);
final InputStream bufferedIn = new BufferedInputStream(fis); final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bufferedIn)) { final DataInputStream in = new DataInputStream(bufferedIn)) {

View File

@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.controller.FileSystemSwapManager org.apache.nifi.controller.FileSystemSwapManager
org.apache.nifi.controller.EncryptedFileSystemSwapManager

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.security.kms.StaticKeyProvider;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Logger;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
/**
* Test cases for {@link EncryptedFileSystemSwapManager}.
*/
public class TestEncryptedFileSystemSwapManager {
private static final Logger logger = Logger.getLogger(TestEncryptedFileSystemSwapManager.class.getName());
/**
* Test a simple swap to disk / swap from disk operation. Configured to use {@link StaticKeyProvider}.
*/
@Test
public void testSwapOutSwapIn() throws GeneralSecurityException, EncryptionException, IOException {
// use temp folder on filesystem to temporarily hold swap content (clean up after test)
final File folderRepository = Files.createTempDirectory(getClass().getSimpleName()).toFile();
logger.info(folderRepository.getPath());
folderRepository.deleteOnExit();
new File(folderRepository, "swap").deleteOnExit();
// configure a nifi properties for encrypted swap file
final Properties properties = new Properties();
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, StaticKeyProvider.class.getName());
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY);
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, StringUtils.repeat("00", 32));
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, folderRepository.getPath());
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, properties);
// generate some flow file content to swap to disk
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; (i < 100); ++i) {
flowFiles.add(new StandardFlowFileRecord.Builder().id(i).build());
}
// setup for test case
final FlowFileSwapManager swapManager = createSwapManager(nifiProperties);
final String queueIdentifier = UUID.randomUUID().toString();
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn(queueIdentifier);
// swap out to disk; pull content back from disk
final String swapPath = swapManager.swapOut(flowFiles, flowFileQueue, "partition-1");
final SwapContents swapContents = swapManager.swapIn(swapPath, flowFileQueue);
// verify recovery of original content
final List<FlowFileRecord> flowFilesRecovered = swapContents.getFlowFiles();
Assert.assertEquals(flowFiles.size(), flowFilesRecovered.size());
Assert.assertTrue(flowFilesRecovered.containsAll(flowFiles));
Assert.assertTrue(flowFiles.containsAll(flowFilesRecovered));
}
/**
* Borrowed from "nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java".
*/
private FlowFileSwapManager createSwapManager(NiFiProperties nifiProperties)
throws IOException, GeneralSecurityException, EncryptionException {
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
when(flowFileRepo.isValidSwapLocationSuffix(any())).thenReturn(true);
final FileSystemSwapManager swapManager = new EncryptedFileSystemSwapManager(nifiProperties);
final ResourceClaimManager resourceClaimManager = Mockito.mock(ResourceClaimManager.class);
final SwapManagerInitializationContext context = Mockito.mock(SwapManagerInitializationContext.class);
when(context.getResourceClaimManager()).thenReturn(resourceClaimManager);
when(context.getFlowFileRepository()).thenReturn(flowFileRepo);
when(context.getEventReporter()).thenReturn(EventReporter.NO_OP);
swapManager.initialize(context);
return swapManager;
}
}