mirror of https://github.com/apache/nifi.git
NIFI-11895 Removed deprecated Repository Encryption configuration
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #7557.
This commit is contained in:
parent
0446990d54
commit
bbd8fb6f63
|
@ -36,7 +36,6 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -109,11 +108,6 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled";
|
||||
public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency";
|
||||
public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url";
|
||||
public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY = "nifi.content.repository.encryption.key";
|
||||
public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_ID = "nifi.content.repository.encryption.key.id";
|
||||
public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.content.repository.encryption.key.provider.implementation";
|
||||
public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.content.repository.encryption.key.provider.location";
|
||||
public static final String CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD = "nifi.content.repository.encryption.key.provider.password";
|
||||
|
||||
// flowfile repository properties
|
||||
public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation";
|
||||
|
@ -121,11 +115,6 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync";
|
||||
public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory";
|
||||
public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY = "nifi.flowfile.repository.encryption.key";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID = "nifi.flowfile.repository.encryption.key.id";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.flowfile.repository.encryption.key.provider.implementation";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.flowfile.repository.encryption.key.provider.location";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD = "nifi.flowfile.repository.encryption.key.provider.password";
|
||||
public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
|
||||
public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
|
||||
|
||||
|
@ -144,11 +133,6 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes";
|
||||
public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size";
|
||||
public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count";
|
||||
public static final String PROVENANCE_REPO_ENCRYPTION_KEY = "nifi.provenance.repository.encryption.key";
|
||||
public static final String PROVENANCE_REPO_ENCRYPTION_KEY_ID = "nifi.provenance.repository.encryption.key.id";
|
||||
public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.provenance.repository.encryption.key.provider.implementation";
|
||||
public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.provenance.repository.encryption.key.provider.location";
|
||||
public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_PASSWORD = "nifi.provenance.repository.encryption.key.provider.password";
|
||||
public static final String PROVENANCE_REPO_DEBUG_FREQUENCY = "nifi.provenance.repository.debug.frequency";
|
||||
|
||||
// status repository properties
|
||||
|
@ -1757,207 +1741,8 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
&& getProperty(SECURITY_TRUSTSTORE_PASSWD) != null;
|
||||
}
|
||||
|
||||
public String getFlowFileRepoEncryptionKeyId() {
|
||||
return getProperty(FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, getProperty(REPOSITORY_ENCRYPTION_KEY_ID));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the active flowfile repository encryption key if a {@code StaticKeyProvider} is in use.
|
||||
* If no key ID is specified in the properties file, the default
|
||||
* {@code nifi.flowfile.repository.encryption.key} value is returned. If a key ID is specified in
|
||||
* {@code nifi.flowfile.repository.encryption.key.id}, it will attempt to read from
|
||||
* {@code nifi.flowfile.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key
|
||||
* ID. If that value is empty, it will use the default property
|
||||
* {@code nifi.flowfile.repository.encryption.key}.
|
||||
*
|
||||
* @return the flowfile repository encryption key in hex form
|
||||
*/
|
||||
public String getFlowFileRepoEncryptionKey() {
|
||||
String keyId = getFlowFileRepoEncryptionKeyId();
|
||||
String keyKey = StringUtils.isBlank(keyId) ? FLOWFILE_REPOSITORY_ENCRYPTION_KEY : FLOWFILE_REPOSITORY_ENCRYPTION_KEY + ".id." + keyId;
|
||||
return getProperty(keyKey, getProperty(FLOWFILE_REPOSITORY_ENCRYPTION_KEY));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a
|
||||
* {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined this method will return an empty map.
|
||||
*
|
||||
* @return a Map of the keys identified by key ID
|
||||
*/
|
||||
public Map<String, String> getFlowFileRepoEncryptionKeys() {
|
||||
return getRepositoryEncryptionKeys("flowfile");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the map of key IDs to keys retrieved from the properties for the given repository type.
|
||||
*
|
||||
* @param repositoryType "provenance", "content", or "flowfile"
|
||||
* @return the key map
|
||||
*/
|
||||
public Map<String, String> getRepositoryEncryptionKeys(final String repositoryType) {
|
||||
Objects.requireNonNull(repositoryType, "Repository Type required");
|
||||
final Map<String, String> keys = new HashMap<>();
|
||||
final List<String> keyProperties = getRepositoryEncryptionKeyProperties(repositoryType);
|
||||
if (keyProperties.size() == 0) {
|
||||
logger.warn("Repository [{}] Encryption Key properties not found", repositoryType);
|
||||
return keys;
|
||||
}
|
||||
final String repositoryEncryptionKey = getRepositoryEncryptionKey(repositoryType);
|
||||
final String repositoryEncryptionKeyId = getRepositoryEncryptionKeyId(repositoryType);
|
||||
|
||||
// Retrieve the actual key values and store non-empty values in the map
|
||||
for (final String keyProperty : keyProperties) {
|
||||
final String keyValue = getProperty(keyProperty);
|
||||
if (StringUtils.isNotBlank(keyValue)) {
|
||||
// If this property is .key (the actual hex key), put it in the map under the value of .key.id (i.e. key1)
|
||||
if (keyProperty.equalsIgnoreCase(repositoryEncryptionKey)) {
|
||||
keys.put(getProperty(repositoryEncryptionKeyId), keyValue);
|
||||
} else {
|
||||
// Extract nifi.*.repository.encryption.key.id.key1 -> key1
|
||||
final String extractedKeyId = keyProperty.substring(keyProperty.lastIndexOf(".") + 1);
|
||||
if (keys.containsKey(extractedKeyId)) {
|
||||
logger.warn("Repository [{}] Duplicate Encryption Key ID [{}]: Ignoring Property [{}]", repositoryType, extractedKeyId, keyProperty);
|
||||
} else {
|
||||
keys.put(extractedKeyId, keyValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of encryption key properties for the specified repository type. If an unknown repository type
|
||||
* is provided, returns an empty list.
|
||||
*
|
||||
* @param repositoryType "provenance", "content", or "flowfile"
|
||||
* @return the list of encryption key properties
|
||||
*/
|
||||
private List<String> getRepositoryEncryptionKeyProperties(String repositoryType) {
|
||||
switch (repositoryType.toLowerCase()) {
|
||||
case "flowfile":
|
||||
return getFlowFileRepositoryEncryptionKeyProperties();
|
||||
case "content":
|
||||
return getContentRepositoryEncryptionKeyProperties();
|
||||
case "provenance":
|
||||
return getProvenanceRepositoryEncryptionKeyProperties();
|
||||
default:
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the encryption key property key for the specified repository type. If an unknown repository type
|
||||
* is provided, returns an empty string.
|
||||
*
|
||||
* @param repositoryType "provenance", "content", or "flowfile"
|
||||
* @return the encryption key property (i.e. {@code FLOWFILE_REPOSITORY_ENCRYPTION_KEY})
|
||||
*/
|
||||
private String getRepositoryEncryptionKey(String repositoryType) {
|
||||
switch (repositoryType.toLowerCase()) {
|
||||
case "flowfile":
|
||||
return FLOWFILE_REPOSITORY_ENCRYPTION_KEY;
|
||||
case "content":
|
||||
return CONTENT_REPOSITORY_ENCRYPTION_KEY;
|
||||
case "provenance":
|
||||
return PROVENANCE_REPO_ENCRYPTION_KEY;
|
||||
default:
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the encryption key ID property key for the specified repository type. If an unknown repository type
|
||||
* is provided, returns an empty string.
|
||||
*
|
||||
* @param repositoryType "provenance", "content", or "flowfile"
|
||||
* @return the encryption key ID property (i.e. {@code FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID})
|
||||
*/
|
||||
private String getRepositoryEncryptionKeyId(String repositoryType) {
|
||||
switch (repositoryType.toLowerCase()) {
|
||||
case "flowfile":
|
||||
return FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID;
|
||||
case "content":
|
||||
return CONTENT_REPOSITORY_ENCRYPTION_KEY_ID;
|
||||
case "provenance":
|
||||
return PROVENANCE_REPO_ENCRYPTION_KEY_ID;
|
||||
default:
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
public String getProvenanceRepoEncryptionKeyId() {
|
||||
return getProperty(PROVENANCE_REPO_ENCRYPTION_KEY_ID, getProperty(REPOSITORY_ENCRYPTION_KEY_ID));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the active provenance repository encryption key if a {@code StaticKeyProvider} is in use.
|
||||
* If no key ID is specified in the properties file, the default
|
||||
* {@code nifi.provenance.repository.encryption.key} value is returned. If a key ID is specified in
|
||||
* {@code nifi.provenance.repository.encryption.key.id}, it will attempt to read from
|
||||
* {@code nifi.provenance.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key
|
||||
* ID. If that value is empty, it will use the default property
|
||||
* {@code nifi.provenance.repository.encryption.key}.
|
||||
*
|
||||
* @return the provenance repository encryption key in hex form
|
||||
*/
|
||||
public String getProvenanceRepoEncryptionKey() {
|
||||
String keyId = getProvenanceRepoEncryptionKeyId();
|
||||
String keyKey = StringUtils.isBlank(keyId) ? PROVENANCE_REPO_ENCRYPTION_KEY : PROVENANCE_REPO_ENCRYPTION_KEY + ".id." + keyId;
|
||||
return getProperty(keyKey, getProperty(PROVENANCE_REPO_ENCRYPTION_KEY));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a
|
||||
* {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined this method will return an empty map.
|
||||
*
|
||||
* @return a Map of the keys identified by key ID
|
||||
*/
|
||||
public Map<String, String> getProvenanceRepoEncryptionKeys() {
|
||||
return getRepositoryEncryptionKeys("provenance");
|
||||
}
|
||||
|
||||
public String getContentRepositoryEncryptionKeyId() {
|
||||
return getProperty(CONTENT_REPOSITORY_ENCRYPTION_KEY_ID, getProperty(REPOSITORY_ENCRYPTION_KEY_ID));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the active content repository encryption key if a {@code StaticKeyProvider} is in use.
|
||||
* If no key ID is specified in the properties file, the default
|
||||
* {@code nifi.content.repository.encryption.key} value is returned. If a key ID is specified in
|
||||
* {@code nifi.content.repository.encryption.key.id}, it will attempt to read from
|
||||
* {@code nifi.content.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key
|
||||
* ID. If that value is empty, it will use the default property
|
||||
* {@code nifi.content.repository.encryption.key}.
|
||||
*
|
||||
* @return the content repository encryption key in hex form
|
||||
*/
|
||||
public String getContentRepositoryEncryptionKey() {
|
||||
String keyId = getContentRepositoryEncryptionKeyId();
|
||||
String keyKey = StringUtils.isBlank(keyId) ? CONTENT_REPOSITORY_ENCRYPTION_KEY : CONTENT_REPOSITORY_ENCRYPTION_KEY + ".id." + keyId;
|
||||
return getProperty(keyKey, getProperty(CONTENT_REPOSITORY_ENCRYPTION_KEY));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a
|
||||
* {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined this method will return an empty map.
|
||||
*
|
||||
* @return a Map of the keys identified by key ID
|
||||
*/
|
||||
public Map<String, String> getContentRepositoryEncryptionKeys() {
|
||||
return getRepositoryEncryptionKeys("content");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the allowed proxy hostnames (and IP addresses) as a comma-delimited string.
|
||||
* The hosts have been normalized to the form {@code somehost.com}, {@code somehost.com:port}, or {@code 127.0.0.1}.
|
||||
* <p>
|
||||
* Note: Calling {@code NiFiProperties.getProperty(NiFiProperties.WEB_PROXY_HOST)} will not normalize the hosts.
|
||||
*
|
||||
* @return the hostname(s)
|
||||
*/
|
||||
public String getAllowedHosts() {
|
||||
return StringUtils.join(getAllowedHostsAsList(), ",");
|
||||
public String getRepositoryEncryptionKeyId() {
|
||||
return getProperty(REPOSITORY_ENCRYPTION_KEY_ID);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2016,27 +1801,6 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
}
|
||||
}
|
||||
|
||||
private List<String> getFlowFileRepositoryEncryptionKeyProperties() {
|
||||
// Filter all the property keys that define a key
|
||||
return getPropertyKeys().stream().filter(k ->
|
||||
k.startsWith(FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(FLOWFILE_REPOSITORY_ENCRYPTION_KEY)
|
||||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<String> getProvenanceRepositoryEncryptionKeyProperties() {
|
||||
// Filter all the property keys that define a key
|
||||
return getPropertyKeys().stream().filter(k ->
|
||||
k.startsWith(PROVENANCE_REPO_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(PROVENANCE_REPO_ENCRYPTION_KEY)
|
||||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<String> getContentRepositoryEncryptionKeyProperties() {
|
||||
// Filter all the property keys that define a key
|
||||
return getPropertyKeys().stream().filter(k ->
|
||||
k.startsWith(CONTENT_REPOSITORY_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(CONTENT_REPOSITORY_ENCRYPTION_KEY)
|
||||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public Long getDefaultBackPressureObjectThreshold() {
|
||||
long backPressureCount;
|
||||
try {
|
||||
|
|
|
@ -1,137 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.repository.encryption.configuration.kms;
|
||||
|
||||
import org.apache.nifi.repository.encryption.configuration.EncryptedRepositoryType;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS;
|
||||
import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION;
|
||||
import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD;
|
||||
import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS;
|
||||
import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION;
|
||||
import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD;
|
||||
import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS;
|
||||
import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION;
|
||||
import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_PASSWORD;
|
||||
|
||||
/**
|
||||
* Enumeration of configuration property names for encrypted repositories supporting backward compatibility
|
||||
*/
|
||||
enum EncryptedRepositoryProperty {
|
||||
CONTENT(
|
||||
EncryptedRepositoryType.CONTENT,
|
||||
CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
|
||||
CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION,
|
||||
CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD
|
||||
),
|
||||
|
||||
FLOWFILE(
|
||||
EncryptedRepositoryType.FLOWFILE,
|
||||
FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
|
||||
FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION,
|
||||
FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD
|
||||
),
|
||||
|
||||
PROVENANCE(
|
||||
EncryptedRepositoryType.PROVENANCE,
|
||||
PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
|
||||
PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION,
|
||||
PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_PASSWORD
|
||||
);
|
||||
|
||||
private EncryptedRepositoryType encryptedRepositoryType;
|
||||
|
||||
private String propertyType;
|
||||
|
||||
private String implementationClass;
|
||||
|
||||
private String location;
|
||||
|
||||
private String password;
|
||||
|
||||
EncryptedRepositoryProperty(final EncryptedRepositoryType encryptedRepositoryType,
|
||||
final String implementationClass,
|
||||
final String location,
|
||||
final String password) {
|
||||
this.encryptedRepositoryType = encryptedRepositoryType;
|
||||
this.propertyType = encryptedRepositoryType.toString().toLowerCase();
|
||||
this.implementationClass = implementationClass;
|
||||
this.location = location;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Encrypted Repository Type
|
||||
*
|
||||
* @return Encrypted Repository Type
|
||||
*/
|
||||
public EncryptedRepositoryType getEncryptedRepositoryType() {
|
||||
return encryptedRepositoryType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Property Type for resolving property names in NiFi Properties
|
||||
*
|
||||
* @return Property Type
|
||||
*/
|
||||
public String getPropertyType() {
|
||||
return propertyType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get implementation class property name
|
||||
*
|
||||
* @return Implementation class property name
|
||||
*/
|
||||
public String getImplementationClass() {
|
||||
return implementationClass;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get location property name
|
||||
*
|
||||
* @return Location property name
|
||||
*/
|
||||
public String getLocation() {
|
||||
return location;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get password property name
|
||||
*
|
||||
* @return Password property name
|
||||
*/
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Encrypted Repository Property from Encrypted Repository Type
|
||||
*
|
||||
* @param encryptedRepositoryType Encryption Repository Type
|
||||
* @return Encrypted Repository Property
|
||||
* @throws IllegalArgumentException Thrown when matching Encrypted Repository Type not found
|
||||
*/
|
||||
public static EncryptedRepositoryProperty fromEncryptedRepositoryType(final EncryptedRepositoryType encryptedRepositoryType) {
|
||||
return Arrays.stream(values())
|
||||
.filter(value -> value.encryptedRepositoryType == encryptedRepositoryType)
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException(encryptedRepositoryType.toString()));
|
||||
}
|
||||
}
|
|
@ -16,9 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.repository.encryption.configuration.kms;
|
||||
|
||||
import org.apache.nifi.security.kms.FileBasedKeyProvider;
|
||||
import org.apache.nifi.security.kms.KeyStoreKeyProvider;
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider;
|
||||
|
||||
/**
|
||||
* Configuration options for Repository Encryption Key Provider
|
||||
|
@ -38,17 +36,13 @@ public enum EncryptionKeyProvider {
|
|||
*
|
||||
* @param implementationClass Implementation class name
|
||||
* @return Encryption Key Provider
|
||||
* @throw IllegalArgumentException Thrown when implementation class name does not match a known class
|
||||
* @throws IllegalArgumentException Thrown when implementation class name does not match a known class
|
||||
*/
|
||||
public static EncryptionKeyProvider fromImplementationClass(final String implementationClass) {
|
||||
EncryptionKeyProvider encryptionKeyProvider;
|
||||
|
||||
if (implementationClass.endsWith(FileBasedKeyProvider.class.getSimpleName())) {
|
||||
encryptionKeyProvider = EncryptionKeyProvider.FILE_PROPERTIES;
|
||||
} else if (implementationClass.endsWith(KeyStoreKeyProvider.class.getSimpleName())) {
|
||||
if (implementationClass.endsWith(KeyStoreKeyProvider.class.getSimpleName())) {
|
||||
encryptionKeyProvider = EncryptionKeyProvider.KEYSTORE;
|
||||
} else if (implementationClass.endsWith(StaticKeyProvider.class.getSimpleName())) {
|
||||
encryptionKeyProvider = EncryptionKeyProvider.NIFI_PROPERTIES;
|
||||
} else {
|
||||
final String message = String.format("Key Provider Class [%s] not supported", implementationClass);
|
||||
throw new IllegalArgumentException(message);
|
||||
|
|
|
@ -16,27 +16,18 @@
|
|||
*/
|
||||
package org.apache.nifi.repository.encryption.configuration.kms;
|
||||
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.nifi.repository.encryption.configuration.EncryptedRepositoryType;
|
||||
import org.apache.nifi.security.kms.KeyProvider;
|
||||
import org.apache.nifi.security.kms.KeyProviderFactory;
|
||||
import org.apache.nifi.security.kms.configuration.FileBasedKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.KeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.KeyStoreKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.StaticKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.util.KeyStoreUtils;
|
||||
import org.apache.nifi.security.util.KeystoreType;
|
||||
import org.apache.nifi.security.util.TlsException;
|
||||
import org.apache.nifi.util.NiFiBootstrapUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.apache.nifi.util.NiFiProperties.REPOSITORY_ENCRYPTION_KEY_PROVIDER;
|
||||
|
@ -47,8 +38,6 @@ import static org.apache.nifi.util.NiFiProperties.REPOSITORY_ENCRYPTION_KEY_PROV
|
|||
* Standard implementation of Repository Key Provider Factory supporting shared and fallback properties
|
||||
*/
|
||||
public class StandardRepositoryKeyProviderFactory implements RepositoryKeyProviderFactory {
|
||||
private static final String ROOT_KEY_ALGORITHM = "AES";
|
||||
|
||||
/**
|
||||
* Get Key Provider for specified Encrypted Repository Type using shared and fallback NiFi Properties
|
||||
*
|
||||
|
@ -60,61 +49,36 @@ public class StandardRepositoryKeyProviderFactory implements RepositoryKeyProvid
|
|||
public KeyProvider getKeyProvider(final EncryptedRepositoryType encryptedRepositoryType, final NiFiProperties niFiProperties) {
|
||||
Objects.requireNonNull(encryptedRepositoryType, "Encrypted Repository Type required");
|
||||
Objects.requireNonNull(niFiProperties, "NiFi Properties required");
|
||||
final EncryptedRepositoryProperty encryptedRepositoryProperty = EncryptedRepositoryProperty.fromEncryptedRepositoryType(encryptedRepositoryType);
|
||||
final EncryptionKeyProvider encryptionKeyProvider = getEncryptionKeyProvider(encryptedRepositoryProperty, niFiProperties);
|
||||
final KeyProviderConfiguration<?> keyProviderConfiguration = getKeyProviderConfiguration(encryptedRepositoryProperty, encryptionKeyProvider, niFiProperties);
|
||||
final EncryptionKeyProvider encryptionKeyProvider = getEncryptionKeyProvider(encryptedRepositoryType, niFiProperties);
|
||||
final KeyProviderConfiguration<?> keyProviderConfiguration = getKeyProviderConfiguration(encryptionKeyProvider, niFiProperties);
|
||||
return KeyProviderFactory.getKeyProvider(keyProviderConfiguration);
|
||||
}
|
||||
|
||||
private EncryptionKeyProvider getEncryptionKeyProvider(final EncryptedRepositoryProperty encryptedRepositoryProperty, final NiFiProperties niFiProperties) {
|
||||
EncryptionKeyProvider encryptionKeyProvider;
|
||||
private EncryptionKeyProvider getEncryptionKeyProvider(final EncryptedRepositoryType encryptedRepositoryType, final NiFiProperties niFiProperties) {
|
||||
final String sharedKeyProvider = niFiProperties.getProperty(REPOSITORY_ENCRYPTION_KEY_PROVIDER);
|
||||
|
||||
if (StringUtils.isBlank(sharedKeyProvider)) {
|
||||
final String classProperty = encryptedRepositoryProperty.getImplementationClass();
|
||||
final String implementationClass = niFiProperties.getProperty(classProperty);
|
||||
if (StringUtils.isBlank(implementationClass)) {
|
||||
final String message = String.format("Key Provider Property [%s] not configured", classProperty);
|
||||
throw new EncryptedConfigurationException(message);
|
||||
} else {
|
||||
encryptionKeyProvider = EncryptionKeyProvider.fromImplementationClass(implementationClass);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
encryptionKeyProvider = EncryptionKeyProvider.valueOf(sharedKeyProvider);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
final EncryptedRepositoryType encryptedRepositoryType = encryptedRepositoryProperty.getEncryptedRepositoryType();
|
||||
final String message = String.format("Key Provider [%s] not supported for Repository Type [%s] ", sharedKeyProvider, encryptedRepositoryType);
|
||||
throw new EncryptedConfigurationException(message);
|
||||
}
|
||||
}
|
||||
|
||||
if (encryptionKeyProvider == null) {
|
||||
final EncryptedRepositoryType encryptedRepositoryType = encryptedRepositoryProperty.getEncryptedRepositoryType();
|
||||
final String message = String.format("Key Provider [%s] not found for Repository Type [%s] ", sharedKeyProvider, encryptedRepositoryType);
|
||||
final String message = String.format("Key Provider [%s] not configured for Repository Type [%s] ", sharedKeyProvider, encryptedRepositoryType);
|
||||
throw new EncryptedConfigurationException(message);
|
||||
}
|
||||
|
||||
return encryptionKeyProvider;
|
||||
try {
|
||||
return EncryptionKeyProvider.valueOf(sharedKeyProvider);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
final String message = String.format("Key Provider [%s] not supported for Repository Type [%s] ", sharedKeyProvider, encryptedRepositoryType);
|
||||
throw new EncryptedConfigurationException(message);
|
||||
}
|
||||
}
|
||||
|
||||
private KeyProviderConfiguration<?> getKeyProviderConfiguration(final EncryptedRepositoryProperty encryptedRepositoryProperty,
|
||||
final EncryptionKeyProvider encryptionKeyProvider,
|
||||
private KeyProviderConfiguration<?> getKeyProviderConfiguration(final EncryptionKeyProvider encryptionKeyProvider,
|
||||
final NiFiProperties niFiProperties) {
|
||||
if (EncryptionKeyProvider.NIFI_PROPERTIES == encryptionKeyProvider) {
|
||||
final Map<String, String> encryptionKeys = niFiProperties.getRepositoryEncryptionKeys(encryptedRepositoryProperty.getPropertyType());
|
||||
return new StaticKeyProviderConfiguration(encryptionKeys);
|
||||
} else if (EncryptionKeyProvider.FILE_PROPERTIES == encryptionKeyProvider) {
|
||||
final SecretKey rootKey = getRootKey();
|
||||
final String location = niFiProperties.getProperty(encryptedRepositoryProperty.getLocation());
|
||||
return new FileBasedKeyProviderConfiguration(location, rootKey);
|
||||
} else if (EncryptionKeyProvider.KEYSTORE == encryptionKeyProvider) {
|
||||
final String providerPassword = getProviderPassword(encryptedRepositoryProperty, niFiProperties);
|
||||
if (EncryptionKeyProvider.KEYSTORE == encryptionKeyProvider) {
|
||||
final String providerPassword = niFiProperties.getProperty(REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_PASSWORD);
|
||||
if (StringUtils.isBlank(providerPassword)) {
|
||||
throw new EncryptedConfigurationException("Key Provider Password not configured");
|
||||
}
|
||||
final char[] keyStorePassword = providerPassword.toCharArray();
|
||||
final String location = getProviderLocation(encryptedRepositoryProperty, niFiProperties);
|
||||
final String location = niFiProperties.getProperty(REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_LOCATION);
|
||||
final KeystoreType keystoreType = KeyStoreUtils.getKeystoreTypeFromExtension(location);
|
||||
try {
|
||||
final KeyStore keyStore = KeyStoreUtils.loadSecretKeyStore(location, keyStorePassword, keystoreType.getType());
|
||||
|
@ -126,23 +90,4 @@ public class StandardRepositoryKeyProviderFactory implements RepositoryKeyProvid
|
|||
throw new UnsupportedOperationException(String.format("Key Provider [%s] not supported", encryptionKeyProvider));
|
||||
}
|
||||
}
|
||||
|
||||
private String getProviderLocation(final EncryptedRepositoryProperty encryptedRepositoryProperty, final NiFiProperties niFiProperties) {
|
||||
final String providerLocation = niFiProperties.getProperty(REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_LOCATION);
|
||||
return niFiProperties.getProperty(encryptedRepositoryProperty.getLocation(), providerLocation);
|
||||
}
|
||||
|
||||
private String getProviderPassword(final EncryptedRepositoryProperty encryptedRepositoryProperty, final NiFiProperties niFiProperties) {
|
||||
final String providerPassword = niFiProperties.getProperty(REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_PASSWORD);
|
||||
return niFiProperties.getProperty(encryptedRepositoryProperty.getPassword(), providerPassword);
|
||||
}
|
||||
|
||||
private static SecretKey getRootKey() {
|
||||
try {
|
||||
String rootKeyHex = NiFiBootstrapUtils.extractKeyFromBootstrapFile();
|
||||
return new SecretKeySpec(Hex.decodeHex(rootKeyHex), ROOT_KEY_ALGORITHM);
|
||||
} catch (final IOException | DecoderException e) {
|
||||
throw new EncryptedConfigurationException("Read Root Key from Bootstrap Failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.repository.encryption.configuration.kms;
|
|||
import org.apache.nifi.repository.encryption.configuration.EncryptedRepositoryType;
|
||||
import org.apache.nifi.security.kms.KeyProvider;
|
||||
import org.apache.nifi.security.kms.KeyStoreKeyProvider;
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -70,9 +69,7 @@ public class StandardRepositoryKeyProviderFactoryTest {
|
|||
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null);
|
||||
final EncryptedRepositoryType encryptedRepositoryType = EncryptedRepositoryType.CONTENT;
|
||||
|
||||
final EncryptedConfigurationException exception = assertThrows(EncryptedConfigurationException.class, () ->
|
||||
factory.getKeyProvider(encryptedRepositoryType, niFiProperties));
|
||||
assertTrue(exception.getMessage().contains(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS));
|
||||
assertThrows(EncryptedConfigurationException.class, () -> factory.getKeyProvider(encryptedRepositoryType, niFiProperties));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -107,45 +104,6 @@ public class StandardRepositoryKeyProviderFactoryTest {
|
|||
assertKeyProviderConfigured(KeyStoreKeyProvider.class, EncryptedRepositoryType.CONTENT, niFiProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyProviderContentStaticKeyProvider() {
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
|
||||
final Class<?> providerClass = StaticKeyProvider.class;
|
||||
properties.put(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, providerClass.getName());
|
||||
properties.put(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY, KEY);
|
||||
properties.put(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_ID, KEY_ID);
|
||||
|
||||
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, properties);
|
||||
assertKeyProviderConfigured(providerClass, EncryptedRepositoryType.CONTENT, niFiProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyProviderFlowFileStaticKeyProvider() {
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
|
||||
final Class<?> providerClass = StaticKeyProvider.class;
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, providerClass.getName());
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, KEY);
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, KEY_ID);
|
||||
|
||||
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, properties);
|
||||
assertKeyProviderConfigured(providerClass, EncryptedRepositoryType.FLOWFILE, niFiProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyProviderProvenanceStaticKeyProvider() {
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
|
||||
final Class<?> providerClass = StaticKeyProvider.class;
|
||||
properties.put(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, providerClass.getName());
|
||||
properties.put(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY, KEY);
|
||||
properties.put(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID, KEY_ID);
|
||||
|
||||
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, properties);
|
||||
assertKeyProviderConfigured(providerClass, EncryptedRepositoryType.PROVENANCE, niFiProperties);
|
||||
}
|
||||
|
||||
private void assertKeyProviderConfigured(final Class<?> providerClass, final EncryptedRepositoryType encryptedRepositoryType, final NiFiProperties niFiProperties) {
|
||||
final KeyProvider keyProvider = factory.getKeyProvider(encryptedRepositoryType, niFiProperties);
|
||||
assertNotNull(keyProvider);
|
||||
|
|
|
@ -21,10 +21,4 @@
|
|||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-security-kms</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms;
|
||||
|
||||
import org.apache.nifi.security.kms.reader.StandardFileBasedKeyReader;
|
||||
import org.apache.nifi.security.kms.reader.FileBasedKeyReader;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
/**
|
||||
* File Based Key Provider reads encrypted Secret Keys from a properties file containing one or more entries
|
||||
*/
|
||||
public class FileBasedKeyProvider extends StaticKeyProvider {
|
||||
private static final FileBasedKeyReader READER = new StandardFileBasedKeyReader();
|
||||
|
||||
public FileBasedKeyProvider(final Path location, final SecretKey rootKey) {
|
||||
super(READER.readSecretKeys(location, rootKey));
|
||||
}
|
||||
}
|
|
@ -16,28 +16,15 @@
|
|||
*/
|
||||
package org.apache.nifi.security.kms;
|
||||
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
import org.apache.nifi.security.kms.configuration.FileBasedKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.KeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.KeyStoreKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.StaticKeyProviderConfiguration;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.nifi.security.kms.reader.KeyReaderException;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.KeyStore;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Key Provider Factory
|
||||
*/
|
||||
public class KeyProviderFactory {
|
||||
private static final String SECRET_KEY_ALGORITHM = "AES";
|
||||
|
||||
/**
|
||||
* Get Key Provider based on Configuration
|
||||
*
|
||||
|
@ -47,20 +34,7 @@ public class KeyProviderFactory {
|
|||
public static KeyProvider getKeyProvider(final KeyProviderConfiguration<?> configuration) {
|
||||
KeyProvider keyProvider;
|
||||
|
||||
if (configuration instanceof StaticKeyProviderConfiguration) {
|
||||
final StaticKeyProviderConfiguration providerConfiguration = (StaticKeyProviderConfiguration) configuration;
|
||||
final Map<String, SecretKey> secretKeys;
|
||||
try {
|
||||
secretKeys = getSecretKeys(providerConfiguration.getKeys());
|
||||
keyProvider = new StaticKeyProvider(secretKeys);
|
||||
} catch (final DecoderException e) {
|
||||
throw new KeyReaderException("Decoding Hexadecimal Secret Keys failed", e);
|
||||
}
|
||||
} else if (configuration instanceof FileBasedKeyProviderConfiguration) {
|
||||
final FileBasedKeyProviderConfiguration providerConfiguration = (FileBasedKeyProviderConfiguration) configuration;
|
||||
final Path keyProviderPath = Paths.get(providerConfiguration.getLocation());
|
||||
keyProvider = new FileBasedKeyProvider(keyProviderPath, providerConfiguration.getRootKey());
|
||||
} else if (configuration instanceof KeyStoreKeyProviderConfiguration) {
|
||||
if (configuration instanceof KeyStoreKeyProviderConfiguration) {
|
||||
final KeyStoreKeyProviderConfiguration providerConfiguration = (KeyStoreKeyProviderConfiguration) configuration;
|
||||
final KeyStore keyStore = providerConfiguration.getKeyStore();
|
||||
keyProvider = new KeyStoreKeyProvider(keyStore, providerConfiguration.getKeyPassword());
|
||||
|
@ -70,16 +44,4 @@ public class KeyProviderFactory {
|
|||
|
||||
return keyProvider;
|
||||
}
|
||||
|
||||
private static Map<String, SecretKey> getSecretKeys(final Map<String, String> keys) throws DecoderException {
|
||||
final Map<String, SecretKey> secretKeys = new HashMap<>();
|
||||
|
||||
for (final Map.Entry<String, String> keyEntry : keys.entrySet()) {
|
||||
final byte[] encodedSecretKey = Hex.decodeHex(keyEntry.getValue());
|
||||
final SecretKey secretKey = new SecretKeySpec(encodedSecretKey, SECRET_KEY_ALGORITHM);
|
||||
secretKeys.put(keyEntry.getKey(), secretKey);
|
||||
}
|
||||
|
||||
return secretKeys;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import javax.crypto.SecretKey;
|
|||
/**
|
||||
* Static Key Provider stores Secret Keys in memory based on initialized configuration properties
|
||||
*/
|
||||
public class StaticKeyProvider implements KeyProvider {
|
||||
class StaticKeyProvider implements KeyProvider {
|
||||
private final Map<String, SecretKey> keys;
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms.configuration;
|
||||
|
||||
import org.apache.nifi.security.kms.FileBasedKeyProvider;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
/**
|
||||
* Configuration for File-based Key Provider
|
||||
*/
|
||||
public class FileBasedKeyProviderConfiguration implements KeyProviderConfiguration<FileBasedKeyProvider> {
|
||||
private final String location;
|
||||
|
||||
private final SecretKey rootKey;
|
||||
|
||||
public FileBasedKeyProviderConfiguration(final String location, final SecretKey rootKey) {
|
||||
this.location = location;
|
||||
this.rootKey = rootKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<FileBasedKeyProvider> getKeyProviderClass() {
|
||||
return FileBasedKeyProvider.class;
|
||||
}
|
||||
|
||||
public String getLocation() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public SecretKey getRootKey() {
|
||||
return rootKey;
|
||||
}
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms.configuration;
|
||||
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Configuration for Static Key Provider
|
||||
*/
|
||||
public class StaticKeyProviderConfiguration implements KeyProviderConfiguration<StaticKeyProvider> {
|
||||
private final Map<String, String> keys;
|
||||
|
||||
public StaticKeyProviderConfiguration(final Map<String, String> keys) {
|
||||
this.keys = Collections.unmodifiableMap(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<StaticKeyProvider> getKeyProviderClass() {
|
||||
return StaticKeyProvider.class;
|
||||
}
|
||||
|
||||
public Map<String, String> getKeys() {
|
||||
return keys;
|
||||
}
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms.reader;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* File Based Key Reader
|
||||
*/
|
||||
public interface FileBasedKeyReader {
|
||||
/**
|
||||
* Read Secret Keys from File Path and decrypt using provided Root Key
|
||||
*
|
||||
* @param path File Path
|
||||
* @param rootKey Root Key
|
||||
* @return Map of Key Identifier to Secret key
|
||||
*/
|
||||
Map<String, SecretKey> readSecretKeys(Path path, SecretKey rootKey);
|
||||
}
|
|
@ -1,115 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms.reader;
|
||||
|
||||
import javax.crypto.BadPaddingException;
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.IllegalBlockSizeException;
|
||||
import javax.crypto.NoSuchPaddingException;
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.crypto.spec.GCMParameterSpec;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.security.InvalidAlgorithmParameterException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Standard File Based Key Reader reads Secret Keys from Properties files encrypted using AES-GCM with Tag Size of 128
|
||||
*/
|
||||
public class StandardFileBasedKeyReader implements FileBasedKeyReader {
|
||||
protected static final String CIPHER_ALGORITHM = "AES/GCM/NoPadding";
|
||||
|
||||
protected static final int IV_LENGTH_BYTES = 16;
|
||||
|
||||
protected static final int TAG_SIZE_BITS = 128;
|
||||
|
||||
private static final Base64.Decoder DECODER = Base64.getDecoder();
|
||||
|
||||
private static final String SECRET_KEY_ALGORITHM = "AES";
|
||||
|
||||
/**
|
||||
* Read Secret Keys using provided Root Secret Key
|
||||
*
|
||||
* @param path File Path contains a properties file with Key Identifier and Base64-encoded encrypted values
|
||||
* @param rootKey Root Secret Key
|
||||
* @return Map of Key Identifier to decrypted Secret Key
|
||||
*/
|
||||
@Override
|
||||
public Map<String, SecretKey> readSecretKeys(final Path path, final SecretKey rootKey) {
|
||||
Objects.requireNonNull(path, "Path required");
|
||||
Objects.requireNonNull(rootKey, "Root Key required");
|
||||
final Map<String, SecretKey> secretKeys = new HashMap<>();
|
||||
|
||||
final Properties properties = getProperties(path);
|
||||
for (final String keyId : properties.stringPropertyNames()) {
|
||||
final String encodedProperty = properties.getProperty(keyId);
|
||||
final SecretKey secretKey = readSecretKey(keyId, encodedProperty, rootKey);
|
||||
secretKeys.put(keyId, secretKey);
|
||||
}
|
||||
return secretKeys;
|
||||
}
|
||||
|
||||
private Properties getProperties(final Path path) {
|
||||
final Properties properties = new Properties();
|
||||
try (final FileInputStream inputStream = new FileInputStream(path.toFile())) {
|
||||
properties.load(inputStream);
|
||||
} catch (final IOException e) {
|
||||
throw new KeyReaderException(String.format("Reading Secret Keys Failed [%s]", path), e);
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
private SecretKey readSecretKey(final String keyId, final String encodedProperty, final SecretKey rootKey) {
|
||||
final byte[] encryptedProperty = DECODER.decode(encodedProperty);
|
||||
final Cipher cipher = getCipher(keyId, encryptedProperty, rootKey);
|
||||
final byte[] encryptedSecretKey = Arrays.copyOfRange(encryptedProperty, IV_LENGTH_BYTES, encryptedProperty.length);
|
||||
try {
|
||||
final byte[] secretKey = cipher.doFinal(encryptedSecretKey);
|
||||
return new SecretKeySpec(secretKey, SECRET_KEY_ALGORITHM);
|
||||
} catch (final IllegalBlockSizeException|BadPaddingException e) {
|
||||
throw new KeyReaderException(String.format("Key Identifier [%s] decryption failed", keyId), e);
|
||||
}
|
||||
}
|
||||
|
||||
private Cipher getCipher(final String keyId, final byte[] encryptedProperty, final SecretKey rootKey) {
|
||||
final byte[] initializationVector = Arrays.copyOfRange(encryptedProperty, 0, IV_LENGTH_BYTES);
|
||||
final Cipher cipher = getCipher();
|
||||
try {
|
||||
cipher.init(Cipher.DECRYPT_MODE, rootKey, new GCMParameterSpec(TAG_SIZE_BITS, initializationVector));
|
||||
} catch (final InvalidAlgorithmParameterException|InvalidKeyException e) {
|
||||
throw new KeyReaderException(String.format("Cipher initialization failed for Key Identifier [%s]", keyId), e);
|
||||
}
|
||||
return cipher;
|
||||
}
|
||||
|
||||
private Cipher getCipher() {
|
||||
try {
|
||||
return Cipher.getInstance(CIPHER_ALGORITHM);
|
||||
} catch (final NoSuchAlgorithmException|NoSuchPaddingException e) {
|
||||
throw new KeyReaderException(String.format("Cipher Algorithm [%s] initialization failed", CIPHER_ALGORITHM), e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,67 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms;
|
||||
|
||||
import org.apache.nifi.security.kms.util.SecretKeyUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class FileBasedKeyProviderTest {
|
||||
private static final String KEYS_EXTENSION = ".keys";
|
||||
|
||||
private static final String KEY_ID = UUID.randomUUID().toString();
|
||||
|
||||
@Test
|
||||
public void testGetKey() throws GeneralSecurityException, IOException {
|
||||
final SecretKey rootKey = SecretKeyUtils.getSecretKey();
|
||||
final SecretKey secretKey = SecretKeyUtils.getSecretKey();
|
||||
final Path secretKeysPath = getSecretKeysPath(rootKey, Collections.singletonMap(KEY_ID, secretKey));
|
||||
final FileBasedKeyProvider provider = new FileBasedKeyProvider(secretKeysPath, rootKey);
|
||||
|
||||
final SecretKey secretKeyFound = provider.getKey(KEY_ID);
|
||||
assertEquals(secretKey, secretKeyFound);
|
||||
assertTrue(provider.keyExists(KEY_ID));
|
||||
assertFalse(provider.getAvailableKeyIds().isEmpty());
|
||||
}
|
||||
|
||||
private Path getSecretKeysPath(final SecretKey rootKey, final Map<String, SecretKey> secretKeys) throws IOException, GeneralSecurityException {
|
||||
final Path path = Files.createTempFile(FileBasedKeyProviderTest.class.getSimpleName(), KEYS_EXTENSION);
|
||||
path.toFile().deleteOnExit();
|
||||
|
||||
final Properties properties = SecretKeyUtils.getEncryptedSecretKeys(rootKey, secretKeys);
|
||||
try (final OutputStream outputStream = new FileOutputStream(path.toFile())) {
|
||||
properties.store(outputStream, null);
|
||||
}
|
||||
|
||||
return path;
|
||||
}
|
||||
}
|
|
@ -16,21 +16,13 @@
|
|||
*/
|
||||
package org.apache.nifi.security.kms;
|
||||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.nifi.security.kms.configuration.FileBasedKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.KeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.KeyStoreKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.configuration.StaticKeyProviderConfiguration;
|
||||
import org.apache.nifi.security.kms.util.SecretKeyUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -43,28 +35,6 @@ public class KeyProviderFactoryTest {
|
|||
assertThrows(UnsupportedOperationException.class, () -> KeyProviderFactory.getKeyProvider(configuration));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetStaticKeyProvider() {
|
||||
final SecretKey secretKey = SecretKeyUtils.getSecretKey();
|
||||
final String encodedSecretKey = Hex.encodeHexString(secretKey.getEncoded());
|
||||
final Map<String, String> keys = Collections.singletonMap(SecretKey.class.getSimpleName(), encodedSecretKey);
|
||||
|
||||
final KeyProviderConfiguration<?> configuration = new StaticKeyProviderConfiguration(keys);
|
||||
final KeyProvider keyProvider = KeyProviderFactory.getKeyProvider(configuration);
|
||||
assertEquals(StaticKeyProvider.class, keyProvider.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFileBasedKeyProvider() throws IOException {
|
||||
final File file = File.createTempFile(KeyProviderFactoryTest.class.getSimpleName(), FileBasedKeyProviderConfiguration.class.getSimpleName());
|
||||
file.deleteOnExit();
|
||||
final String location = file.getAbsolutePath();
|
||||
final SecretKey rootKey = SecretKeyUtils.getSecretKey();
|
||||
final KeyProviderConfiguration<?> configuration = new FileBasedKeyProviderConfiguration(location, rootKey);
|
||||
final KeyProvider keyProvider = KeyProviderFactory.getKeyProvider(configuration);
|
||||
assertEquals(FileBasedKeyProvider.class, keyProvider.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyStoreKeyProvider() throws GeneralSecurityException, IOException {
|
||||
final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
|
|
|
@ -1,62 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.kms.reader;
|
||||
|
||||
import org.apache.nifi.security.kms.util.SecretKeyUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class StandardFileBasedKeyReaderTest {
|
||||
private static final String KEYS_EXTENSION = ".keys";
|
||||
|
||||
private static final SecretKey ROOT_KEY = SecretKeyUtils.getSecretKey();
|
||||
|
||||
@Test
|
||||
public void testReadSecretKeys() throws Exception {
|
||||
final StandardFileBasedKeyReader reader = new StandardFileBasedKeyReader();
|
||||
|
||||
final SecretKey secretKey = SecretKeyUtils.getSecretKey();
|
||||
final String keyId = SecretKey.class.getSimpleName();
|
||||
|
||||
final Path path = getSecretKeysPath(ROOT_KEY, Collections.singletonMap(keyId, secretKey));
|
||||
final Map<String, SecretKey> secretKeys = reader.readSecretKeys(path, ROOT_KEY);
|
||||
final SecretKey readSecretKey = secretKeys.get(keyId);
|
||||
assertEquals(secretKey, readSecretKey, "Secret Key not matched");
|
||||
}
|
||||
|
||||
private Path getSecretKeysPath(final SecretKey rootKey, final Map<String, SecretKey> secretKeys) throws Exception {
|
||||
final Path path = Files.createTempFile(StandardFileBasedKeyReaderTest.class.getSimpleName(), KEYS_EXTENSION);
|
||||
path.toFile().deleteOnExit();
|
||||
|
||||
final Properties properties = SecretKeyUtils.getEncryptedSecretKeys(rootKey, secretKeys);
|
||||
try (final OutputStream outputStream = new FileOutputStream(path.toFile())) {
|
||||
properties.store(outputStream, null);
|
||||
}
|
||||
|
||||
return path;
|
||||
}
|
||||
}
|
|
@ -16,52 +16,16 @@
|
|||
*/
|
||||
package org.apache.nifi.security.kms.util;
|
||||
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.crypto.spec.GCMParameterSpec;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Base64;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class SecretKeyUtils {
|
||||
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
|
||||
|
||||
private static final Base64.Encoder ENCODER = Base64.getEncoder();
|
||||
|
||||
private static final String CIPHER_ALGORITHM = "AES/GCM/NoPadding";
|
||||
|
||||
private static final String KEY_ALGORITHM = "AES";
|
||||
|
||||
private static final int KEY_LENGTH = 32;
|
||||
|
||||
private static final int IV_LENGTH = 16;
|
||||
|
||||
private static final int TAG_LENGTH = 128;
|
||||
|
||||
/**
|
||||
* Get Encrypted Secret Keys as Properties
|
||||
*
|
||||
* @param rootKey Root Key used to encrypt Secret Keys
|
||||
* @param secretKeys Map of Key Identifier to Secret Key
|
||||
* @return Properties containing encrypted Secret Keys
|
||||
* @throws GeneralSecurityException Thrown on getEncryptedSecretKey()
|
||||
*/
|
||||
public static Properties getEncryptedSecretKeys(final SecretKey rootKey, final Map<String, SecretKey> secretKeys) throws GeneralSecurityException {
|
||||
final Properties properties = new Properties();
|
||||
for (final Map.Entry<String, SecretKey> secretKeyEntry : secretKeys.entrySet()) {
|
||||
final SecretKey secretKey = secretKeyEntry.getValue();
|
||||
final String encryptedSecretKey = getEncryptedSecretKey(rootKey, secretKey);
|
||||
properties.setProperty(secretKeyEntry.getKey(), encryptedSecretKey);
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Random AES Secret Key
|
||||
*
|
||||
|
@ -72,31 +36,4 @@ public class SecretKeyUtils {
|
|||
SECURE_RANDOM.nextBytes(encodedKey);
|
||||
return new SecretKeySpec(encodedKey, KEY_ALGORITHM);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Encrypted Secret Key using AES-GCM with Base64 encoded string prefixed with initialization vector
|
||||
*
|
||||
* @param rootKey Root Key used to encrypt Secret Key
|
||||
* @param secretKey Secret Key to be encrypted
|
||||
* @return Base64 encoded and encrypted Secret Key
|
||||
* @throws GeneralSecurityException Thrown when unable to encrypt Secret Key
|
||||
*/
|
||||
private static String getEncryptedSecretKey(final SecretKey rootKey, final SecretKey secretKey) throws GeneralSecurityException {
|
||||
final Cipher cipher = Cipher.getInstance(CIPHER_ALGORITHM);
|
||||
|
||||
final byte[] initializationVector = new byte[IV_LENGTH];
|
||||
SECURE_RANDOM.nextBytes(initializationVector);
|
||||
cipher.init(Cipher.ENCRYPT_MODE, rootKey, new GCMParameterSpec(TAG_LENGTH, initializationVector));
|
||||
final byte[] encryptedSecretKey = cipher.doFinal(secretKey.getEncoded());
|
||||
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
try {
|
||||
outputStream.write(initializationVector);
|
||||
outputStream.write(encryptedSecretKey);
|
||||
} catch (final IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
final byte[] encryptedProperty = outputStream.toByteArray();
|
||||
return ENCODER.encodeToString(encryptedProperty);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3546,25 +3546,6 @@ implementation.
|
|||
|`nifi.flowfile.repository.always.sync`|If set to `true`, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is `false`, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is `false`.
|
||||
|====
|
||||
|
||||
[[encrypted-write-ahead-flowfile-repository-properties]]
|
||||
=== Encrypted Write Ahead FlowFile Repository Properties
|
||||
|
||||
WARNING: The following properties are deprecated in favor of <<repository-encryption-properties>> properties.
|
||||
|
||||
All of the properties defined above (see <<write-ahead-flowfile-repository,Write Ahead FlowFile Repository>>) still apply. Only encryption-specific properties are listed here. See <<user-guide.adoc#encrypted-flowfile,Encrypted FlowFile Repository in the User Guide>> for more information.
|
||||
|
||||
NOTE: Unlike the encrypted content and provenance repositories, the repository implementation does not change here, only the _underlying write-ahead log implementation_. This allows for cleaner separation and more flexibility in implementation selection. The property that should be changed to enable encryption is `nifi.flowfile.repository.wal.implementation`.
|
||||
|
||||
|====
|
||||
|*Property*|*Description*
|
||||
|`nifi.flowfile.repository.encryption.key.provider.implementation`|This is the fully-qualified class name of the **key provider**. A key provider is the datastore interface for accessing the encryption key to protect the content claims. There are currently three implementations: `StaticKeyProvider` which reads a key directly from _nifi.properties_, `FileBasedKeyProvider` which reads keys from an encrypted file, and `KeyStoreKeyProvider` which reads keys from a standard `java.security.KeyStore`.
|
||||
|`nifi.flowfile.repository.encryption.key.provider.location`|The path to the key definition resource (empty for `StaticKeyProvider`, `./keys.nkp` or similar path for `FileBasedKeyProvider`). For future providers like an HSM, this may be a connection string or URL.
|
||||
|`nifi.flowfile.repository.encryption.key.provider.password`|The password used for decrypting the key definition resource, such as the keystore for `KeyStoreKeyProvider`.
|
||||
|`nifi.flowfile.repository.encryption.key.id`|The active key ID to use for encryption (e.g. `Key1`).
|
||||
|`nifi.flowfile.repository.encryption.key`|The key to use for `StaticKeyProvider`. The key format is hex-encoded (`0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210`) but can also be encrypted using the `./encrypt-config.sh` tool in NiFi Toolkit (see the <<toolkit-guide.adoc#encrypt_config_tool,Encrypt-Config Tool>> section in the link:toolkit-guide.html[NiFi Toolkit Guide] for more information).
|
||||
|`nifi.flowfile.repository.encryption.key.id.`*|Allows for additional keys to be specified for the `StaticKeyProvider`. For example, the line `nifi.flowfile.repository.encryption.key.id.Key2=012...210` would provide an available key `Key2`.
|
||||
|====
|
||||
|
||||
=== Volatile FlowFile Repository
|
||||
|
||||
This implementation stores FlowFiles in memory instead of on disk. It *will* result in data loss in the event of power/machine failure or a restart of NiFi. To use this implementation, set `nifi.flowfile.repository.implementation` to `org.apache.nifi.controller.repository.VolatileFlowFileRepository`.
|
||||
|
@ -3635,23 +3616,6 @@ For example, if `nifi.content.repository.archive.max.usage.percentage` is `50%`
|
|||
|`nifi.content.repository.archive.cleanup.frequency`| The frequency with which to schedule the content archive clean up task. The default value is `1 Second`. A value lower than `1 Second` is not allowed.
|
||||
|====
|
||||
|
||||
[[encrypted-file-system-content-repository-properties]]
|
||||
=== Encrypted File System Content Repository Properties
|
||||
|
||||
WARNING: The following properties are deprecated in favor of <<repository-encryption-properties>> properties.
|
||||
|
||||
All of the properties defined above (see <<file-system-content-repository-properties,File System Content Repository Properties>>) still apply. Only encryption-specific properties are listed here. See <<user-guide.adoc#encrypted-content,Encrypted Content Repository in the User Guide>> for more information.
|
||||
|
||||
|====
|
||||
|*Property*|*Description*
|
||||
|`nifi.content.repository.encryption.key.provider.implementation`|This is the fully-qualified class name of the **key provider**. A key provider is the datastore interface for accessing the encryption key to protect the content claims. There are currently three implementations: `StaticKeyProvider` which reads a key directly from _nifi.properties_, `FileBasedKeyProvider` which reads keys from an encrypted file, and `KeyStoreKeyProvider` which reads keys from a standard `java.security.KeyStore`.
|
||||
|`nifi.content.repository.encryption.key.provider.location`|The path to the key definition resource (empty for `StaticKeyProvider`, `./keys.nkp` or similar path for `FileBasedKeyProvider`). For future providers like an HSM, this may be a connection string or URL.
|
||||
|`nifi.content.repository.encryption.key.provider.password`|The password used for decrypting the key definition resource, such as the keystore for `KeyStoreKeyProvider`.
|
||||
|`nifi.content.repository.encryption.key.id`|The active key ID to use for encryption (e.g. `Key1`).
|
||||
|`nifi.content.repository.encryption.key`|The key to use for `StaticKeyProvider`. The key format is hex-encoded (`0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210`) but can also be encrypted using the `./encrypt-config.sh` tool in NiFi Toolkit (see the <<toolkit-guide.adoc#encrypt_config_tool,Encrypt-Config Tool>> section in the link:toolkit-guide.html[NiFi Toolkit Guide] for more information).
|
||||
|`nifi.content.repository.encryption.key.id.`*|Allows for additional keys to be specified for the `StaticKeyProvider`. For example, the line `nifi.content.repository.encryption.key.id.Key2=012...210` would provide an available key `Key2`.
|
||||
|====
|
||||
|
||||
=== Provenance Repository
|
||||
|
||||
The Provenance Repository contains the information related to Data Provenance. The next four sections are for Provenance Repository properties.
|
||||
|
@ -3730,23 +3694,6 @@ will be destroyed as well.
|
|||
are not fully utilized, this feature can result in far faster Provenance queries. The default value for this property is blank (i.e. disabled).
|
||||
|====
|
||||
|
||||
[[encrypted-write-ahead-provenance-repository-properties]]
|
||||
=== Encrypted Write Ahead Provenance Repository Properties
|
||||
|
||||
WARNING: The following properties are deprecated in favor of <<repository-encryption-properties>> properties.
|
||||
|
||||
All of the properties defined above (see <<write-ahead-provenance-repository-properties,Write Ahead Repository Properties>>) still apply. Only encryption-specific properties are listed here. See <<user-guide.adoc#encrypted-provenance,Encrypted Provenance Repository in the User Guide>> for more information.
|
||||
|
||||
|====
|
||||
|*Property*|*Description*
|
||||
|`nifi.provenance.repository.encryption.key.provider.implementation`|This is the fully-qualified class name of the **key provider**. A key provider is the datastore interface for accessing the encryption key to protect the provenance events. There are currently three implementations: `StaticKeyProvider` which reads a key directly from _nifi.properties_, `FileBasedKeyProvider` which reads keys from an encrypted file, and `KeyStoreKeyProvider` which reads keys from a standard `java.security.KeyStore`.
|
||||
|`nifi.provenance.repository.encryption.key.provider.location`|The path to the key definition resource (empty for `StaticKeyProvider`, `./keys.nkp` or similar path for `FileBasedKeyProvider`). For future providers like an HSM, this may be a connection string or URL.
|
||||
|`nifi.provenance.repository.encryption.key.provider.password`|The password used for decrypting the key definition resource, such as the keystore for `KeyStoreKeyProvider`.
|
||||
|`nifi.provenance.repository.encryption.key.id`|The active key ID to use for encryption (e.g. `Key1`).
|
||||
|`nifi.provenance.repository.encryption.key`|The key to use for `StaticKeyProvider`. The key format is hex-encoded (`0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210`) but can also be encrypted using the `./encrypt-config.sh` tool in NiFi Toolkit (see the <<toolkit-guide.adoc#encrypt_config_tool,Encrypt-Config Tool>> section in the link:toolkit-guide.html[NiFi Toolkit Guide] for more information).
|
||||
|`nifi.provenance.repository.encryption.key.id.`*|Allows for additional keys to be specified for the `StaticKeyProvider`. For example, the line `nifi.provenance.repository.encryption.key.id.Key2=012...210` would provide an available key `Key2`.
|
||||
|====
|
||||
|
||||
=== Persistent Provenance Repository Properties
|
||||
|
||||
|====
|
||||
|
|
|
@ -63,7 +63,7 @@ public class EncryptedSchemaRepositoryRecordSerde implements SerDe<SerializedRep
|
|||
final RepositoryKeyProviderFactory repositoryKeyProviderFactory = new StandardRepositoryKeyProviderFactory();
|
||||
final KeyProvider keyProvider = repositoryKeyProviderFactory.getKeyProvider(EncryptedRepositoryType.FLOWFILE, niFiProperties);
|
||||
this.encryptor = new AesGcmByteArrayRepositoryEncryptor(keyProvider, EncryptionMetadataHeader.FLOWFILE);
|
||||
this.keyId = niFiProperties.getFlowFileRepoEncryptionKeyId();
|
||||
this.keyId = niFiProperties.getRepositoryEncryptionKeyId();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -59,7 +59,7 @@ public class EncryptedFileSystemSwapManager extends FileSystemSwapManager {
|
|||
super(nifiProperties);
|
||||
final RepositoryKeyProviderFactory repositoryKeyProviderFactory = new StandardRepositoryKeyProviderFactory();
|
||||
final KeyProvider keyProvider = repositoryKeyProviderFactory.getKeyProvider(EncryptedRepositoryType.FLOWFILE, nifiProperties);
|
||||
final String keyId = nifiProperties.getFlowFileRepoEncryptionKeyId();
|
||||
final String keyId = nifiProperties.getRepositoryEncryptionKeyId();
|
||||
this.secretKey = keyProvider.getKey(keyId);
|
||||
}
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ public class EncryptedFileSystemRepository extends FileSystemRepository {
|
|||
final RepositoryKeyProviderFactory repositoryKeyProviderFactory = new StandardRepositoryKeyProviderFactory();
|
||||
final KeyProvider keyProvider = repositoryKeyProviderFactory.getKeyProvider(EncryptedRepositoryType.CONTENT, niFiProperties);
|
||||
repositoryEncryptor = new AesCtrStreamRepositoryEncryptor(keyProvider, EncryptionMetadataHeader.CONTENT);
|
||||
keyId = Objects.requireNonNull(niFiProperties.getContentRepositoryEncryptionKeyId(), "Key Identifier required");
|
||||
keyId = Objects.requireNonNull(niFiProperties.getRepositoryEncryptionKeyId(), "Key Identifier required");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,628 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller.repository.crypto
|
||||
|
||||
import org.apache.nifi.controller.repository.StandardContentRepositoryContext
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
|
||||
import org.apache.nifi.controller.repository.util.DiskUtils
|
||||
import org.apache.nifi.events.EventReporter
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.bouncycastle.util.encoders.Hex
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.condition.DisabledOnOs
|
||||
import org.junit.jupiter.api.condition.OS
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Path
|
||||
import java.security.Security
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals
|
||||
|
||||
@DisabledOnOs(OS.WINDOWS)
|
||||
class EncryptedFileSystemRepositoryTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EncryptedFileSystemRepositoryTest.class)
|
||||
|
||||
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
|
||||
private static final String KEY_HEX_1 = KEY_HEX_128 * 2
|
||||
private static final String KEY_ID_1 = "K1"
|
||||
|
||||
private EncryptedFileSystemRepository repository = null
|
||||
private final File rootFile = new File("target/content_repository")
|
||||
private NiFiProperties nifiProperties
|
||||
|
||||
private static final boolean isLossTolerant = false
|
||||
|
||||
private static final String DEFAULT_NIFI_PROPS_PATH = "/conf/nifi.properties"
|
||||
|
||||
private static final Map<String, String> DEFAULT_ENCRYPTION_PROPS = [
|
||||
(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION) : "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository",
|
||||
(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_ID) : KEY_ID_1,
|
||||
(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY) : KEY_HEX_1,
|
||||
(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
|
||||
(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION) : ""
|
||||
]
|
||||
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.debug("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
// Use mock NiFiProperties w/ encrypted configs
|
||||
repository = initializeRepository()
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to set up an encrypted content repository.
|
||||
*
|
||||
* @param nifiPropertiesPath the actual NiFi properties path
|
||||
* @param additionalProperties overriding properties for the ECR
|
||||
* @return the initialized repository
|
||||
*/
|
||||
private EncryptedFileSystemRepository initializeRepository(String nifiPropertiesPath = DEFAULT_NIFI_PROPS_PATH, Map<String, String> additionalProperties = DEFAULT_ENCRYPTION_PROPS) {
|
||||
nifiProperties = NiFiProperties.createBasicNiFiProperties(EncryptedFileSystemRepositoryTest.class.getResource(nifiPropertiesPath).path, additionalProperties)
|
||||
if (rootFile.exists()) {
|
||||
DiskUtils.deleteRecursively(rootFile)
|
||||
}
|
||||
|
||||
EncryptedFileSystemRepository repository = new EncryptedFileSystemRepository(nifiProperties)
|
||||
StandardResourceClaimManager claimManager = new StandardResourceClaimManager()
|
||||
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP))
|
||||
repository.purge()
|
||||
logger.info("Created EFSR with nifi.properties [${nifiPropertiesPath}] and ${additionalProperties.size()} additional properties: ${additionalProperties}")
|
||||
|
||||
repository
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() throws Exception {
|
||||
repository.shutdown()
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadNullContentClaimShouldReturnEmptyInputStream() {
|
||||
final InputStream inputStream = repository.read((ContentClaim) null)
|
||||
final int read = inputStream.read()
|
||||
assertEquals(-1, read)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to write encrypted content to the repository, independently read the persisted file to ensure the content is encrypted, and then retrieve & decrypt via the repository.
|
||||
*/
|
||||
@Test
|
||||
void testShouldEncryptAndDecrypt() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
String plainContent = "hello"
|
||||
byte[] plainBytes = plainContent.bytes
|
||||
logger.info("Writing \"${plainContent}\" (${plainContent.length()}): ${Hex.toHexString(plainBytes)}")
|
||||
|
||||
// Act
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
// Assert
|
||||
|
||||
// Use the EFSR to decrypt the same content
|
||||
byte [] retrievedBytes = verifyClaimDecryption(claim, plainBytes)
|
||||
assertEquals(plainContent, new String(retrievedBytes, StandardCharsets.UTF_8))
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to write encrypted image content to the repository, independently read the persisted file to ensure the content is encrypted, and then retrieve & decrypt via the repository.
|
||||
*/
|
||||
@Test
|
||||
void testShouldEncryptAndDecryptImage() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File image = new File("src/test/resources/encrypted_content_repo.png")
|
||||
byte[] plainBytes = image.bytes
|
||||
logger.info("Writing \"${image.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
// Act
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
// Assert
|
||||
|
||||
// Use the EFSR to decrypt the same content
|
||||
verifyClaimDecryption(claim, plainBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to write multiple pieces of encrypted content to the repository and then retrieve & decrypt via the repository.
|
||||
*/
|
||||
@Test
|
||||
void testShouldEncryptAndDecryptMultipleRecords() {
|
||||
def content = [
|
||||
"This is a plaintext message. ",
|
||||
"Some,csv,data\ncol1,col2,col3",
|
||||
"Easy to read 0123456789abcdef"
|
||||
]
|
||||
|
||||
def claims = createClaims(3)
|
||||
|
||||
// Act
|
||||
writeContentToClaims(formClaimMap(claims, content))
|
||||
|
||||
// Assert
|
||||
claims.eachWithIndex { ContentClaim claim, int i ->
|
||||
String pieceOfContent = content[i]
|
||||
|
||||
// Use the EFSR to decrypt the same content
|
||||
byte [] retrievedBytes = verifyClaimDecryption(claim, pieceOfContent.bytes)
|
||||
assertEquals(pieceOfContent, new String(retrievedBytes, StandardCharsets.UTF_8))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to show no blocking on uninitialized key ID to retrieve content.
|
||||
*/
|
||||
@Test
|
||||
void testReadShouldNotRequireActiveKeyId() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
String plainContent = "hello"
|
||||
byte[] plainBytes = plainContent.bytes
|
||||
|
||||
// Write the encrypted content to the repository
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
// Act
|
||||
final InputStream inputStream = repository.read(claim)
|
||||
byte[] retrievedContent = inputStream.bytes
|
||||
logger.info("Read bytes via repository (${retrievedContent.length}): ${pba(retrievedContent)}")
|
||||
|
||||
// Assert
|
||||
assertEquals(plainContent, new String(retrievedContent, StandardCharsets.UTF_8))
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to configure repository instance from nifi.properties.
|
||||
*/
|
||||
@Test
|
||||
void testConstructorShouldReadFromNiFiProperties() {
|
||||
// Arrange
|
||||
String plainContent = "hello"
|
||||
byte[] plainBytes = plainContent.bytes
|
||||
|
||||
// Remove the generic repository instance
|
||||
repository.purge()
|
||||
repository.cleanup()
|
||||
repository.shutdown()
|
||||
repository = null
|
||||
|
||||
// Act
|
||||
|
||||
// Create a new repository with the encryption properties
|
||||
repository = initializeRepository(DEFAULT_NIFI_PROPS_PATH, DEFAULT_ENCRYPTION_PROPS)
|
||||
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
// Assert
|
||||
|
||||
// Verify implicit configuration of necessary fields by encrypting and decrypting one record
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
verifyClaimDecryption(claim, plainBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to ensure that when content is imported from an InputStream, it is encrypted.
|
||||
*/
|
||||
@Test
|
||||
void testImportFromInputStreamShouldEncryptContent() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File image = new File("src/test/resources/bgBannerFoot.png")
|
||||
byte[] plainBytes = image.bytes
|
||||
logger.info("Writing \"${image.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
// Act
|
||||
final long bytesRead = repository.importFrom(image.newInputStream(), claim)
|
||||
logger.info("Read ${bytesRead} bytes from ${image.name} into ${claim.resourceClaim.id}")
|
||||
|
||||
// Use the EFSR to decrypt the same content
|
||||
verifyClaimDecryption(claim, plainBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to ensure that when content is imported from a path, it is encrypted.
|
||||
*/
|
||||
@Test
|
||||
void testImportFromPathShouldEncryptContent() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File image = new File("src/test/resources/bgBannerFoot.png")
|
||||
byte[] plainBytes = image.bytes
|
||||
logger.info("Writing \"${image.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
// Act
|
||||
final long bytesRead = repository.importFrom(image.toPath(), claim)
|
||||
logger.info("Read ${bytesRead} bytes from ${image.name} into ${claim.resourceClaim.id}")
|
||||
|
||||
// Use the EFSR to decrypt the same content
|
||||
verifyClaimDecryption(claim, plainBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to ensure that when content is exported to an OutputStream, it is decrypted.
|
||||
*/
|
||||
@Test
|
||||
void testExportToOutputStreamShouldDecryptContent() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File image = new File("src/test/resources/bgBannerFoot.png")
|
||||
byte[] plainBytes = image.bytes
|
||||
logger.info("Writing \"${image.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
final OutputStream outputStream = new ByteArrayOutputStream()
|
||||
|
||||
// Act
|
||||
final long bytesWritten = repository.exportTo(claim, outputStream)
|
||||
logger.info("Wrote ${bytesWritten} bytes from ${claim.resourceClaim.id} into OutputStream")
|
||||
|
||||
// Independently access the output stream and verify that the content is plain text
|
||||
byte[] exportedBytes = outputStream.toByteArray()
|
||||
logger.info("Read bytes from output stream (${exportedBytes.length}): ${pba(exportedBytes)}")
|
||||
|
||||
// Assert
|
||||
assertArrayEquals(plainBytes, exportedBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to ensure that when a subset of content is exported to an OutputStream, it is decrypted.
|
||||
*/
|
||||
@Test
|
||||
void testExportSubsetToOutputStreamShouldDecryptContent() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File longText = new File("src/test/resources/longtext.txt")
|
||||
byte[] plainBytes = longText.bytes
|
||||
logger.info("Writing \"${longText.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
final OutputStream outputStream = new ByteArrayOutputStream()
|
||||
|
||||
// Act
|
||||
long offset = 100
|
||||
long length = 50
|
||||
logger.info("Exporting claim ${claim} (offset: ${offset}, length: ${length}) to output stream")
|
||||
logger.info("Expecting these bytes from plain content: ${pba(plainBytes[offset..<(offset + length)] as byte[])}")
|
||||
|
||||
final long bytesWritten = repository.exportTo(claim, outputStream, offset, length)
|
||||
logger.info("Wrote ${bytesWritten} bytes from ${claim.resourceClaim.id} into OutputStream")
|
||||
|
||||
// Independently access the output stream and verify that the content is plain text
|
||||
byte[] exportedBytes = outputStream.toByteArray()
|
||||
logger.info("Read bytes from output stream (${exportedBytes.length}): ${pba(exportedBytes)}")
|
||||
|
||||
// Assert
|
||||
assertArrayEquals(plainBytes[offset..<(offset + length)] as byte[], exportedBytes)
|
||||
assertEquals(length, exportedBytes.length)
|
||||
assertEquals(length, bytesWritten)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to ensure that when content is exported to a path, it is decrypted.
|
||||
*/
|
||||
@Test
|
||||
void testExportToPathShouldDecryptContent() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File image = new File("src/test/resources/bgBannerFoot.png")
|
||||
byte[] plainBytes = image.bytes
|
||||
logger.info("Writing \"${image.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
final File tempOutputFile = new File("target/exportedContent")
|
||||
final Path tempPath = tempOutputFile.toPath()
|
||||
|
||||
// Act
|
||||
final long bytesWritten = repository.exportTo(claim, tempPath, false)
|
||||
logger.info("Wrote ${bytesWritten} bytes from ${claim.resourceClaim.id} into path ${tempPath}")
|
||||
|
||||
// Independently access the path and verify that the content is plain text
|
||||
byte[] exportedBytes = tempOutputFile.bytes
|
||||
logger.info("Read bytes from path (${exportedBytes.length}): ${pba(exportedBytes)}")
|
||||
|
||||
// Assert
|
||||
try {
|
||||
assertArrayEquals(plainBytes, exportedBytes)
|
||||
} finally {
|
||||
// Clean up
|
||||
tempOutputFile.delete()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to ensure that when a subset of content is exported to a path, it is decrypted.
|
||||
*/
|
||||
@Test
|
||||
void testExportSubsetToPathShouldDecryptContent() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File longText = new File("src/test/resources/longtext.txt")
|
||||
byte[] plainBytes = longText.bytes
|
||||
logger.info("Writing \"${longText.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
final File tempOutputFile = new File("target/exportedContent")
|
||||
final Path tempPath = tempOutputFile.toPath()
|
||||
|
||||
// Act
|
||||
long offset = 100
|
||||
long length = 50
|
||||
logger.info("Exporting claim ${claim} (offset: ${offset}, length: ${length}) to output stream")
|
||||
logger.info("Expecting these bytes from plain content: ${pba(plainBytes[offset..<(offset + length)] as byte[])}")
|
||||
|
||||
final long bytesWritten = repository.exportTo(claim, tempPath, false, offset, length)
|
||||
logger.info("Wrote ${bytesWritten} bytes from ${claim.resourceClaim.id} into path ${tempPath}")
|
||||
|
||||
// Independently access the path and verify that the content is plain text
|
||||
byte[] exportedBytes = tempOutputFile.bytes
|
||||
logger.info("Read bytes from path (${exportedBytes.length}): ${pba(exportedBytes)}")
|
||||
|
||||
// Assert
|
||||
try {
|
||||
assertArrayEquals(plainBytes[offset..<(offset + length)] as byte[], exportedBytes)
|
||||
assertEquals(length, exportedBytes.length)
|
||||
assertEquals(length, bytesWritten)
|
||||
} finally {
|
||||
// Clean up
|
||||
tempOutputFile.delete()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to clone encrypted content claim and ensure that the cloned encryption metadata accurately reflects the new claim and allows for decryption.
|
||||
*/
|
||||
@Test
|
||||
void testCloneShouldUpdateEncryptionMetadata() {
|
||||
// Arrange
|
||||
final ContentClaim claim = repository.create(isLossTolerant)
|
||||
|
||||
File textFile = new File("src/test/resources/longtext.txt")
|
||||
byte[] plainBytes = textFile.bytes
|
||||
logger.info("Writing \"${textFile.name}\" (${plainBytes.length}): ${pba(plainBytes)}")
|
||||
|
||||
// Write to the content repository (encrypted)
|
||||
writeContentToClaim(claim, plainBytes)
|
||||
|
||||
// Clone the content claim
|
||||
logger.info("Preparing to clone claim ${claim}")
|
||||
ContentClaim clonedClaim = repository.clone(claim, isLossTolerant)
|
||||
logger.info("Cloned claim ${claim} to ${clonedClaim}")
|
||||
|
||||
// Use the EFSR to decrypt the original claim content
|
||||
byte [] retrievedOriginalBytes = verifyClaimDecryption(claim, plainBytes)
|
||||
assertArrayEquals(plainBytes, retrievedOriginalBytes)
|
||||
|
||||
// Use the EFSR to decrypt the cloned claim content
|
||||
byte [] retrievedClonedBytes = verifyClaimDecryption(clonedClaim, plainBytes)
|
||||
assertArrayEquals(plainBytes, retrievedClonedBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to merge two encrypted content claims and ensure that the merged encryption metadata accurately reflects the new claim and allows for decryption.
|
||||
*/
|
||||
@Test
|
||||
void testMergeShouldUpdateEncryptionMetadata() {
|
||||
// Arrange
|
||||
int claimCount = 2
|
||||
def claims = createClaims(claimCount, isLossTolerant)
|
||||
|
||||
File textFile = new File("src/test/resources/longtext.txt")
|
||||
byte[] plainBytes = textFile.bytes
|
||||
String plainContent = textFile.text
|
||||
|
||||
// Split the long text into two claims
|
||||
def content = splitTextIntoSections(plainContent, claimCount)
|
||||
|
||||
// Write each piece of content to the respective claim
|
||||
writeContentToClaims(formClaimMap(claims, content))
|
||||
|
||||
// Merge the two content claims
|
||||
logger.info("Preparing to merge claims ${claims}")
|
||||
ContentClaim mergedClaim = repository.create(isLossTolerant)
|
||||
// The header, footer, and demarcator are null in this case
|
||||
long bytesWrittenDuringMerge = repository.merge(claims, mergedClaim, null, null, null)
|
||||
logger.info("Merged ${claims.size()} claims (${bytesWrittenDuringMerge} bytes) to ${mergedClaim}")
|
||||
|
||||
// Use the EFSR to decrypt the original claims content
|
||||
claims.eachWithIndex { ContentClaim claim, int i ->
|
||||
verifyClaimDecryption(claim, content[i].bytes)
|
||||
}
|
||||
|
||||
// Use the EFSR to decrypt the merged claim content
|
||||
verifyClaimDecryption(mergedClaim, plainBytes, "merged")
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test to merge encrypted content claims and ensure that the merged encryption
|
||||
* metadata accurately reflects the new claim and allows for decryption,
|
||||
* including the header, demarcator, and footer.
|
||||
*/
|
||||
@Test
|
||||
void testMergeWithMarkersShouldUpdateEncryptionMetadata() {
|
||||
// Arrange
|
||||
int claimCount = 4
|
||||
def claims = createClaims(claimCount, isLossTolerant)
|
||||
|
||||
File textFile = new File("src/test/resources/longtext.txt")
|
||||
String plainContent = textFile.text
|
||||
|
||||
// Split the long text into two claims
|
||||
List<String> content = splitTextIntoSections(plainContent, claimCount)
|
||||
|
||||
// Write each piece of content to the respective claim
|
||||
writeContentToClaims(formClaimMap(claims, content))
|
||||
|
||||
// Define the markers
|
||||
String header = "---Header---\n"
|
||||
String demarcator = "\n---Boundary---\n"
|
||||
String footer = "\n---Footer---"
|
||||
final String EXPECTED_MERGED_CONTENT = header + content.join(demarcator) + footer
|
||||
|
||||
// Merge the content claims
|
||||
logger.info("Preparing to merge claims ${claims}")
|
||||
ContentClaim mergedClaim = repository.create(isLossTolerant)
|
||||
// The header, footer, and demarcator are populated in this case
|
||||
long bytesWrittenDuringMerge = repository.merge(claims, mergedClaim, header.bytes, footer.bytes, demarcator.bytes)
|
||||
logger.info("Merged ${claims.size()} claims (${bytesWrittenDuringMerge} bytes) to ${mergedClaim}")
|
||||
|
||||
// Use the EFSR to decrypt the original claims content
|
||||
claims.eachWithIndex { ContentClaim claim, int i ->
|
||||
verifyClaimDecryption(claim, content[i].bytes)
|
||||
}
|
||||
|
||||
// Use the EFSR to decrypt the merged claim content
|
||||
verifyClaimDecryption(mergedClaim, EXPECTED_MERGED_CONTENT.bytes, "merged")
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code List<String>} with length {@code N}, where N is the number of elements requested. Each element
|
||||
* will be roughly the same size.
|
||||
*
|
||||
* @param plainContent the original String content
|
||||
* @param requestedElements the number of pieces of content to return
|
||||
* @return a list containing {@code requestedElements} elements
|
||||
*/
|
||||
private static List<String> splitTextIntoSections(String plainContent, int requestedElements = 2) {
|
||||
Number contentSectionLength = plainContent.size().intdiv(requestedElements)
|
||||
def content = []
|
||||
int start, end = 0
|
||||
requestedElements.times { int i ->
|
||||
start = i * contentSectionLength
|
||||
end = (i + 1) * contentSectionLength
|
||||
content << plainContent[start..<end]
|
||||
}
|
||||
content
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to verify the provided claim content equals the expected plain content,
|
||||
* decrypted via the {@link EncryptedFileSystemRepository#read()} method.
|
||||
*
|
||||
* @param claim the claim to verify
|
||||
* @param plainBytes the expected content once decrypted
|
||||
* @param description a message for contextualized log output
|
||||
* @return the retrieved, decrypted bytes
|
||||
*/
|
||||
private byte[] verifyClaimDecryption(ContentClaim claim, byte[] plainBytes, String description = "claim") {
|
||||
final InputStream inputStream = repository.read(claim)
|
||||
byte[] retrievedBytes = inputStream.bytes
|
||||
logger.info("Read ${description} bytes via repository (${retrievedBytes.length}): ${pba(retrievedBytes)}")
|
||||
|
||||
// Assert
|
||||
assertArrayEquals(plainBytes, retrievedBytes)
|
||||
return retrievedBytes
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create <em>n</em> claims.
|
||||
*
|
||||
* @param n the number of claims to create
|
||||
* @param isLossTolerant true if the claims are loss tolerant
|
||||
* @return the list of claims
|
||||
*/
|
||||
private List<ContentClaim> createClaims(int n, boolean isLossTolerant = false) {
|
||||
def claims = []
|
||||
n.times {
|
||||
claims << repository.create(isLossTolerant)
|
||||
}
|
||||
claims
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to form a map out of parallel lists of claims and their respective
|
||||
* content (converts to bytes if in a String).
|
||||
*
|
||||
* @param claims the list of claims
|
||||
* @param content the list of content (indexed in the same order)
|
||||
* @return the map of the claims and content
|
||||
*/
|
||||
private static Map<ContentClaim, byte[]> formClaimMap(List<ContentClaim> claims, List content) {
|
||||
def claimMap = [:]
|
||||
claims.eachWithIndex { ContentClaim claim, int i ->
|
||||
def element = content[i]
|
||||
claimMap << [(claim): element instanceof byte[] ? element : element.bytes]
|
||||
}
|
||||
claimMap
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to iterate over a map of claim -> byte[] content and write it out.
|
||||
*
|
||||
* @param claimsAndContent a map of claims and their respective incoming content
|
||||
*/
|
||||
private void writeContentToClaims(Map<ContentClaim, byte[]> claimsAndContent) {
|
||||
claimsAndContent.each { ContentClaim claim, byte[] content ->
|
||||
writeContentToClaim(claim, content)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to write the content to a claim.
|
||||
*
|
||||
* @param claim the claim
|
||||
* @param content the byte[] to write
|
||||
*/
|
||||
private void writeContentToClaim(ContentClaim claim, byte[] content) {
|
||||
// Write to the content repository (encrypted)
|
||||
final OutputStream out = repository.write(claim)
|
||||
out.write(content)
|
||||
out.flush()
|
||||
out.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a truncated byte[] in hexadecimal encoding as a String.
|
||||
*
|
||||
* @param bytes the byte[]
|
||||
* @param length the length in bytes to show (default 16)
|
||||
* @return the hex-encoded representation of {@code length} bytes
|
||||
*/
|
||||
private static String pba(byte[] bytes, int length = 16) {
|
||||
"[${Hex.toHexString(bytes)[0..<(Math.min(length, bytes.length * 2))]}${bytes.length * 2 > length ? "..." : ""}]"
|
||||
}
|
||||
}
|
|
@ -1,233 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.wali
|
||||
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue
|
||||
import org.apache.nifi.controller.repository.EncryptedSchemaRepositoryRecordSerde
|
||||
import org.apache.nifi.controller.repository.LiveSerializedRepositoryRecord
|
||||
import org.apache.nifi.controller.repository.RepositoryRecordType
|
||||
import org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde
|
||||
import org.apache.nifi.controller.repository.SerializedRepositoryRecord
|
||||
import org.apache.nifi.controller.repository.StandardFlowFileRecord
|
||||
import org.apache.nifi.controller.repository.StandardRepositoryRecord
|
||||
import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
|
||||
import org.apache.nifi.repository.schema.NoOpFieldCache
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.TestInfo
|
||||
import org.junit.jupiter.api.condition.DisabledOnOs
|
||||
import org.junit.jupiter.api.condition.OS
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.wali.SerDe
|
||||
import org.wali.SerDeFactory
|
||||
import org.wali.SingletonSerDeFactory
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue
|
||||
|
||||
@DisabledOnOs(OS.WINDOWS)
|
||||
class EncryptedSequentialAccessWriteAheadLogTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLogTest.class)
|
||||
|
||||
public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"
|
||||
|
||||
private ResourceClaimManager claimManager
|
||||
private FlowFileQueue flowFileQueue
|
||||
private ByteArrayOutputStream byteArrayOutputStream
|
||||
private DataOutputStream dataOutputStream
|
||||
|
||||
// TODO: Mock the wrapped serde
|
||||
// TODO: Make integration test with real wrapped serde
|
||||
private SerDe<SerializedRepositoryRecord> wrappedSerDe
|
||||
|
||||
private static final String KPI = StaticKeyProvider.class.name
|
||||
private static final String KEY_ID = "K1"
|
||||
private static final String KEY = "0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210"
|
||||
|
||||
private NiFiProperties properties
|
||||
|
||||
private EncryptedSchemaRepositoryRecordSerde esrrs
|
||||
|
||||
private TestInfo testName
|
||||
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.debug("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void setUp(TestInfo testInfo) throws Exception {
|
||||
testName = testInfo
|
||||
claimManager = new StandardResourceClaimManager()
|
||||
flowFileQueue = createAndRegisterMockQueue(TEST_QUEUE_IDENTIFIER)
|
||||
byteArrayOutputStream = new ByteArrayOutputStream()
|
||||
dataOutputStream = new DataOutputStream(byteArrayOutputStream)
|
||||
wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager, new NoOpFieldCache())
|
||||
|
||||
properties = NiFiProperties.createBasicNiFiProperties(null, [
|
||||
(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): KPI,
|
||||
(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID) : KEY_ID,
|
||||
(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY) : KEY
|
||||
])
|
||||
|
||||
esrrs = new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, properties)
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() throws Exception {
|
||||
claimManager.purge()
|
||||
}
|
||||
|
||||
private FlowFileQueue createMockQueue(String identifier = testName.testMethod.get().name + new Date().toString()) {
|
||||
[getIdentifier: { ->
|
||||
logger.mock("Retrieving flowfile queue identifier: ${identifier}" as String)
|
||||
identifier
|
||||
}] as FlowFileQueue
|
||||
}
|
||||
|
||||
private FlowFileQueue createAndRegisterMockQueue(String identifier = testName.testMethod.get().name + new Date().toString()) {
|
||||
FlowFileQueue queue = createMockQueue(identifier)
|
||||
queue
|
||||
}
|
||||
|
||||
private SerializedRepositoryRecord buildCreateRecord(FlowFileQueue queue, Map<String, String> attributes = [:]) {
|
||||
StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
|
||||
StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime())
|
||||
ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, String>)
|
||||
record.setWorking(ffrb.build(), false)
|
||||
|
||||
return new LiveSerializedRepositoryRecord(record)
|
||||
}
|
||||
|
||||
private String getMockUUID() {
|
||||
"${testName.testMethod.get().name ?: "no_test"}@${new Date().format("mmssSSS")}" as String
|
||||
}
|
||||
|
||||
/** This test creates flowfile records, adds them to the repository, and then recovers them to ensure they were persisted */
|
||||
@Test
|
||||
void testShouldUpdateWithExternalFile() {
|
||||
// Arrange
|
||||
final EncryptedSchemaRepositoryRecordSerde encryptedSerde = buildEncryptedSerDe()
|
||||
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> repo = createWriteRepo(encryptedSerde)
|
||||
|
||||
final List<SerializedRepositoryRecord> records = new ArrayList<>()
|
||||
10.times { int i ->
|
||||
def attributes = [name: "User ${i}" as String, age: "${i}" as String]
|
||||
final SerializedRepositoryRecord record = buildCreateRecord(flowFileQueue, attributes)
|
||||
records.add(record)
|
||||
}
|
||||
|
||||
// Act
|
||||
repo.update(records, false)
|
||||
repo.shutdown()
|
||||
|
||||
// Assert
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> recoveryRepo = createRecoveryRepo()
|
||||
final Collection<SerializedRepositoryRecord> recovered = recoveryRepo.recoverRecords()
|
||||
|
||||
// Ensure that the same records are returned (order is not guaranteed)
|
||||
assertEquals(records.size(), recovered.size())
|
||||
recovered.forEach(it -> assertEquals(RepositoryRecordType.CREATE, it.type))
|
||||
|
||||
// Check that all attributes (flowfile record) in the recovered records were present in the original list
|
||||
recovered.forEach(it -> assertTrue(it.getFlowFileRecord() in records*.getFlowFileRecord()))
|
||||
}
|
||||
|
||||
/** This test creates flowfile records, adds them to the repository, and then recovers them to ensure they were persisted */
|
||||
@Test
|
||||
void testShouldUpdateWithExternalFileAfterCheckpoint() {
|
||||
// Arrange
|
||||
final EncryptedSchemaRepositoryRecordSerde encryptedSerde = buildEncryptedSerDe()
|
||||
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> repo = createWriteRepo(encryptedSerde)
|
||||
|
||||
final List<SerializedRepositoryRecord> records = new ArrayList<>()
|
||||
10_000.times { int i ->
|
||||
def attributes = [name: "User ${i}" as String, age: "${i}" as String]
|
||||
final SerializedRepositoryRecord record = buildCreateRecord(flowFileQueue, attributes)
|
||||
records.add(record)
|
||||
}
|
||||
|
||||
// Act
|
||||
repo.update(records, false)
|
||||
repo.shutdown()
|
||||
|
||||
// Assert
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> recoveryRepo = createRecoveryRepo()
|
||||
final Collection<SerializedRepositoryRecord> recovered = recoveryRepo.recoverRecords()
|
||||
|
||||
// Ensure that the same records (except now UPDATE instead of CREATE) are returned (order is not guaranteed)
|
||||
assertEquals(records.size(), recovered.size())
|
||||
recovered.forEach(it -> assertEquals( RepositoryRecordType.CREATE, it.type))
|
||||
}
|
||||
|
||||
private EncryptedSchemaRepositoryRecordSerde buildEncryptedSerDe() {
|
||||
final StandardRepositoryRecordSerdeFactory factory = new StandardRepositoryRecordSerdeFactory(claimManager)
|
||||
SchemaRepositoryRecordSerde wrappedSerDe = factory.createSerDe() as SchemaRepositoryRecordSerde
|
||||
return new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, properties)
|
||||
}
|
||||
|
||||
private SequentialAccessWriteAheadLog<SerializedRepositoryRecord> createWriteRepo(final SerDe<SerializedRepositoryRecord> serde) throws IOException {
|
||||
final File targetDir = new File("target")
|
||||
final File storageDir = new File(targetDir, testName.testMethod.get().name)
|
||||
deleteRecursively(storageDir)
|
||||
assertTrue(storageDir.mkdirs())
|
||||
|
||||
final SerDeFactory<SerializedRepositoryRecord> serdeFactory = new SingletonSerDeFactory<>(serde)
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory)
|
||||
|
||||
final Collection<SerializedRepositoryRecord> recovered = repo.recoverRecords()
|
||||
assertNotNull(recovered)
|
||||
assertTrue(recovered.isEmpty())
|
||||
|
||||
return repo
|
||||
}
|
||||
|
||||
private SequentialAccessWriteAheadLog<SerializedRepositoryRecord> createRecoveryRepo() throws IOException {
|
||||
final File targetDir = new File("target")
|
||||
final File storageDir = new File(targetDir, testName.testMethod.get().name)
|
||||
|
||||
final SerDe<SerializedRepositoryRecord> serde = buildEncryptedSerDe()
|
||||
final SerDeFactory<SerializedRepositoryRecord> serdeFactory = new SingletonSerDeFactory<>(serde)
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory)
|
||||
|
||||
return repo
|
||||
}
|
||||
|
||||
private void deleteRecursively(final File file) {
|
||||
final File[] children = file.listFiles()
|
||||
if (children != null) {
|
||||
for (final File child : children) {
|
||||
deleteRecursively(child)
|
||||
}
|
||||
}
|
||||
|
||||
file.delete()
|
||||
}
|
||||
}
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.controller;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||
import org.apache.nifi.controller.repository.FlowFileRepository;
|
||||
|
@ -26,18 +25,25 @@ import org.apache.nifi.controller.repository.SwapContents;
|
|||
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -49,8 +55,43 @@ import static org.mockito.Mockito.when;
|
|||
* Test cases for {@link EncryptedFileSystemSwapManager}.
|
||||
*/
|
||||
public class TestEncryptedFileSystemSwapManager {
|
||||
private static final String KEYSTORE_CREDENTIALS = UUID.randomUUID().toString();
|
||||
|
||||
private static final String KEYSTORE_NAME = "repository.p12";
|
||||
|
||||
private static final String KEY_ID = "primary-key";
|
||||
|
||||
private static final String KEYSTORE_TYPE = "PKCS12";
|
||||
|
||||
private static final int KEY_LENGTH = 32;
|
||||
|
||||
private static final String KEY_ALGORITHM = "AES";
|
||||
|
||||
private static Path keyStorePath;
|
||||
|
||||
@BeforeAll
|
||||
public static void setRepositoryKeystore(@TempDir final Path temporaryDirectory) throws GeneralSecurityException, IOException {
|
||||
keyStorePath = temporaryDirectory.resolve(KEYSTORE_NAME);
|
||||
|
||||
final SecureRandom secureRandom = new SecureRandom();
|
||||
final byte[] key = new byte[KEY_LENGTH];
|
||||
secureRandom.nextBytes(key);
|
||||
final SecretKeySpec secretKeySpec = new SecretKeySpec(key, KEY_ALGORITHM);
|
||||
|
||||
final KeyStore keyStore = KeyStore.getInstance(KEYSTORE_TYPE);
|
||||
keyStore.load(null);
|
||||
|
||||
final KeyStore.SecretKeyEntry secretKeyEntry = new KeyStore.SecretKeyEntry(secretKeySpec);
|
||||
final KeyStore.PasswordProtection passwordProtection = new KeyStore.PasswordProtection(KEYSTORE_CREDENTIALS.toCharArray());
|
||||
keyStore.setEntry(KEY_ID, secretKeyEntry, passwordProtection);
|
||||
|
||||
try (final OutputStream outputStream = Files.newOutputStream(keyStorePath)) {
|
||||
keyStore.store(outputStream, KEYSTORE_CREDENTIALS.toCharArray());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a simple swap to disk / swap from disk operation. Configured to use {@link StaticKeyProvider}.
|
||||
* Test a simple swap to disk / swap from disk operation
|
||||
*/
|
||||
@Test
|
||||
public void testSwapOutSwapIn() throws GeneralSecurityException, IOException {
|
||||
|
@ -60,10 +101,7 @@ public class TestEncryptedFileSystemSwapManager {
|
|||
new File(folderRepository, "swap").deleteOnExit();
|
||||
|
||||
// configure a nifi properties for encrypted swap file
|
||||
final Properties properties = new Properties();
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, StaticKeyProvider.class.getName());
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY);
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, StringUtils.repeat("00", 32));
|
||||
final Map<String, String> properties = getEncryptionProperties();
|
||||
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, folderRepository.getPath());
|
||||
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, properties);
|
||||
|
||||
|
@ -107,4 +145,14 @@ public class TestEncryptedFileSystemSwapManager {
|
|||
|
||||
return swapManager;
|
||||
}
|
||||
|
||||
private Map<String, String> getEncryptionProperties() {
|
||||
final Map<String, String> encryptedRepoProperties = new HashMap<>();
|
||||
encryptedRepoProperties.put("nifi.repository.encryption.protocol.version", "1");
|
||||
encryptedRepoProperties.put("nifi.repository.encryption.key.id", KEY_ID);
|
||||
encryptedRepoProperties.put("nifi.repository.encryption.key.provider", "KEYSTORE");
|
||||
encryptedRepoProperties.put("nifi.repository.encryption.key.provider.keystore.location", keyStorePath.toString());
|
||||
encryptedRepoProperties.put("nifi.repository.encryption.key.provider.keystore.password", KEYSTORE_CREDENTIALS);
|
||||
return encryptedRepoProperties;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,10 +52,6 @@ class ProtectedNiFiProperties extends NiFiProperties implements ProtectedPropert
|
|||
SECURITY_KEYSTORE_PASSWD,
|
||||
SECURITY_TRUSTSTORE_PASSWD,
|
||||
SENSITIVE_PROPS_KEY,
|
||||
PROVENANCE_REPO_ENCRYPTION_KEY,
|
||||
PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_PASSWORD,
|
||||
FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD,
|
||||
CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_PASSWORD,
|
||||
REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_PASSWORD,
|
||||
SECURITY_USER_OIDC_CLIENT_SECRET
|
||||
));
|
||||
|
|
|
@ -88,7 +88,7 @@ public class EncryptedWriteAheadProvenanceRepository extends WriteAheadProvenanc
|
|||
// Build a factory using lambda which injects the encryptor
|
||||
final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> {
|
||||
final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
|
||||
final String keyId = niFiProperties.getProvenanceRepoEncryptionKeyId();
|
||||
final String keyId = niFiProperties.getRepositoryEncryptionKeyId();
|
||||
return new EncryptedSchemaRecordWriter(file, idGenerator, tocWriter, compressed, BLOCK_SIZE, idLookup, repositoryEncryptor, keyId);
|
||||
};
|
||||
|
||||
|
|
|
@ -1,267 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License") you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.provenance
|
||||
|
||||
import org.apache.nifi.events.EventReporter
|
||||
import org.apache.nifi.flowfile.FlowFile
|
||||
import org.apache.nifi.reporting.Severity
|
||||
import org.apache.nifi.security.kms.StaticKeyProvider
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.apache.nifi.util.file.FileUtils
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
import java.security.Security
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import static org.apache.nifi.provenance.TestUtil.createFlowFile
|
||||
import static org.hamcrest.MatcherAssert.assertThat
|
||||
import static org.hamcrest.CoreMatchers.is
|
||||
import static org.hamcrest.CoreMatchers.hasItems
|
||||
|
||||
class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
|
||||
private static final String KEY_HEX = KEY_HEX_128
|
||||
private static final String KEY_ID = "K1"
|
||||
|
||||
private static final String TRANSIT_URI = "nifi://unit-test"
|
||||
private static final String PROCESSOR_TYPE = "Mock Processor"
|
||||
private static final String COMPONENT_ID = "1234"
|
||||
|
||||
private static final AtomicLong recordId = new AtomicLong()
|
||||
|
||||
private ProvenanceRepository repo
|
||||
private static RepositoryConfiguration config
|
||||
private File provenanceRepositoryDirectory
|
||||
|
||||
private EventReporter eventReporter
|
||||
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
|
||||
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
provenanceRepositoryDirectory = File.createTempDir(getClass().simpleName)
|
||||
reportedEvents?.clear()
|
||||
eventReporter = createMockEventReporter()
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() throws Exception {
|
||||
closeRepo(repo, config)
|
||||
if (provenanceRepositoryDirectory != null & provenanceRepositoryDirectory.isDirectory()) {
|
||||
provenanceRepositoryDirectory.deleteDir()
|
||||
}
|
||||
}
|
||||
|
||||
private static RepositoryConfiguration createConfiguration(final File provenanceDir) {
|
||||
final RepositoryConfiguration config = new RepositoryConfiguration()
|
||||
config.addStorageDirectory("1", provenanceDir)
|
||||
config.setCompressOnRollover(true)
|
||||
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
|
||||
config.setCompressionBlockBytes(100)
|
||||
return config
|
||||
}
|
||||
|
||||
private EventReporter createMockEventReporter() {
|
||||
[reportEvent: { Severity s, String c, String m ->
|
||||
ReportedEvent event = new ReportedEvent(s, c, m)
|
||||
reportedEvents.add(event)
|
||||
}] as EventReporter
|
||||
}
|
||||
|
||||
private void closeRepo(final ProvenanceRepository repo = this.repo, final RepositoryConfiguration config = this.config) throws IOException {
|
||||
if (repo == null) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
repo.close()
|
||||
} catch (final IOException ignored) {
|
||||
// intentionally blank
|
||||
}
|
||||
|
||||
// Delete all of the storage files. We do this in order to clean up the tons of files that
|
||||
// we create but also to ensure that we have closed all of the file handles. If we leave any
|
||||
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
|
||||
if (config != null) {
|
||||
for (final File storageDir : config.getStorageDirectories().values()) {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
try {
|
||||
FileUtils.deleteFile(storageDir, true)
|
||||
break
|
||||
} catch (IOException ioe) {
|
||||
// if there is a virus scanner, etc. running in the background we may not be able to
|
||||
// delete the file. Wait a sec and try again.
|
||||
if (i == 2) {
|
||||
throw ioe
|
||||
} else {
|
||||
try {
|
||||
System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists())
|
||||
FileUtils.deleteFile(storageDir, true)
|
||||
break
|
||||
} catch (final IOException ioe2) {
|
||||
// if there is a virus scanner, etc. running in the background we may not be able to
|
||||
// delete the file. Wait a sec and try again.
|
||||
if (i == 2) {
|
||||
throw ioe2
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(1000L)
|
||||
} catch (final InterruptedException ignored) {
|
||||
// intentionally blank
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final FlowFile buildFlowFile(final Map attributes = [:], final long id = recordId.getAndIncrement(), final long fileSize = 3000L) {
|
||||
if (!attributes?.uuid) {
|
||||
attributes.uuid = UUID.randomUUID().toString()
|
||||
}
|
||||
createFlowFile(id, fileSize, attributes)
|
||||
}
|
||||
|
||||
private static ProvenanceEventRecord buildEventRecord(final FlowFile flowfile = buildFlowFile(), final ProvenanceEventType eventType = ProvenanceEventType.RECEIVE,
|
||||
final String transitUri = TRANSIT_URI, final String componentId = COMPONENT_ID,
|
||||
final String componentType = PROCESSOR_TYPE, final long eventTime = System.currentTimeMillis()) {
|
||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
|
||||
builder.setEventTime(eventTime)
|
||||
builder.setEventType(eventType)
|
||||
builder.setTransitUri(transitUri)
|
||||
builder.fromFlowFile(flowfile)
|
||||
builder.setComponentId(componentId)
|
||||
builder.setComponentType(componentType)
|
||||
builder.build()
|
||||
}
|
||||
|
||||
/**
|
||||
* This test operates on {@link WriteAheadProvenanceRepository} to verify the normal operations of existing implementations.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
|
||||
// Arrange
|
||||
config = createConfiguration(provenanceRepositoryDirectory)
|
||||
// Needed until NIFI-3605 is implemented
|
||||
// config.setMaxEventFileCapacity(1L)
|
||||
config.setMaxEventFileCount(1)
|
||||
config.setMaxEventFileLife(1, TimeUnit.SECONDS)
|
||||
repo = new WriteAheadProvenanceRepository(config)
|
||||
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
||||
|
||||
final Map attributes = ["abc": "xyz",
|
||||
"123": "456"]
|
||||
final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
|
||||
|
||||
final int RECORD_COUNT = 10
|
||||
|
||||
// Act
|
||||
RECORD_COUNT.times {
|
||||
repo.registerEvent(record)
|
||||
}
|
||||
|
||||
// Ensure there is not a timing issue retrieving all records
|
||||
Thread.sleep(1000)
|
||||
|
||||
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
|
||||
|
||||
// Assert
|
||||
assertThat(recoveredRecords.size(), is(RECORD_COUNT))
|
||||
recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
|
||||
assertThat(recovered.getEventId(), is(i as Long))
|
||||
assertThat(recovered.getTransitUri(), is(TRANSIT_URI))
|
||||
assertThat(recovered.getEventType(), is(ProvenanceEventType.RECEIVE))
|
||||
// The UUID was added later but we care that all attributes we provided are still there
|
||||
assertThat(recovered.getAttributes().entrySet(), hasItems(attributes.entrySet().toArray() as Map.Entry<String, String>[]))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEncryptedWriteAheadProvenanceRepositoryShouldRegisterAndGetEvents() {
|
||||
// Arrange
|
||||
final int RECORD_COUNT = 10
|
||||
|
||||
// ensure NiFiProperties are converted to RepositoryConfig during encrypted repo constructor
|
||||
final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, [
|
||||
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
|
||||
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY): KEY_HEX,
|
||||
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID): KEY_ID,
|
||||
(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + "test"): provenanceRepositoryDirectory.toString()
|
||||
])
|
||||
|
||||
repo = new EncryptedWriteAheadProvenanceRepository(properties)
|
||||
config = repo.getConfig()
|
||||
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
||||
|
||||
final Map attributes = ["abc": "This is a plaintext attribute.",
|
||||
"123": "This is another plaintext attribute."]
|
||||
final List<ProvenanceEventRecord> records = []
|
||||
RECORD_COUNT.times { int i ->
|
||||
records << buildEventRecord(buildFlowFile(attributes + [count: i as String]))
|
||||
}
|
||||
|
||||
final long LAST_RECORD_ID = repo.getMaxEventId()
|
||||
|
||||
// Act
|
||||
repo.registerEvents(records)
|
||||
|
||||
// Retrieve the events through the interface
|
||||
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
|
||||
|
||||
// Assert
|
||||
recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
|
||||
assertThat(recoveredRecord.getEventId(), is(LAST_RECORD_ID + 1 + i))
|
||||
assertThat(recoveredRecord.getTransitUri(), is(TRANSIT_URI))
|
||||
assertThat(recoveredRecord.getEventType(), is(ProvenanceEventType.RECEIVE))
|
||||
// The UUID was added later but we care that all attributes we provided are still there
|
||||
assertThat(recoveredRecord.getAttributes().entrySet(), hasItems((Map.Entry<String, String>[])attributes.entrySet().toArray()))
|
||||
assertThat(recoveredRecord.getAttribute("count"), is(i as String))
|
||||
}
|
||||
}
|
||||
|
||||
private static class ReportedEvent {
|
||||
final Severity severity
|
||||
final String category
|
||||
final String message
|
||||
|
||||
ReportedEvent(final Severity severity, final String category, final String message) {
|
||||
this.severity = severity
|
||||
this.category = category
|
||||
this.message = message
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
"ReportedEvent [${severity}] ${category}: ${message}"
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue