From ebe8b9a2e78d1d87046eb1d6c9f86e86203b6744 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Sat, 9 Sep 2023 09:38:54 -0500 Subject: [PATCH] NIFI-12033 Added EncryptContentAge and DecryptContentAge Processors This closes #7676 Signed-off-by: Paul Grey --- .../nifi-cipher-processors/pom.xml | 12 + .../processors/cipher/DecryptContentAge.java | 299 ++++++++++++++++++ .../processors/cipher/EncryptContentAge.java | 299 ++++++++++++++++++ .../cipher/age/AbstractAgeKeyReader.java | 78 +++++ .../cipher/age/AgeKeyIndicator.java | 47 +++ .../processors/cipher/age/AgeKeyReader.java | 37 +++ .../cipher/age/AgeKeyValidator.java | 146 +++++++++ .../cipher/age/AgePrivateKeyReader.java | 72 +++++ .../cipher/age/AgeProviderResolver.java | 72 +++++ .../cipher/age/AgePublicKeyReader.java | 72 +++++ .../processors/cipher/age/FileEncoding.java | 49 +++ .../nifi/processors/cipher/age/KeySource.java | 49 +++ .../cipher/io/ChannelStreamCallback.java | 98 ++++++ .../org.apache.nifi.processor.Processor | 2 + .../cipher/DecryptContentAgeTest.java | 232 ++++++++++++++ .../cipher/EncryptContentAgeTest.java | 246 ++++++++++++++ .../cipher/age/AgePrivateKeyReaderTest.java | 66 ++++ .../cipher/age/AgePublicKeyReaderTest.java | 66 ++++ .../cipher/io/ChannelStreamCallbackTest.java | 123 +++++++ nifi-nar-bundles/nifi-cipher-bundle/pom.xml | 12 + 20 files changed, 2077 insertions(+) create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/DecryptContentAge.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/EncryptContentAge.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AbstractAgeKeyReader.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyIndicator.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyReader.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyValidator.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReader.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeProviderResolver.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReader.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/FileEncoding.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/KeySource.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallback.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/DecryptContentAgeTest.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/EncryptContentAgeTest.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReaderTest.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReaderTest.java create mode 100644 nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallbackTest.java diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/pom.xml b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/pom.xml index 920c9b68da..1df0a3fa6c 100644 --- a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/pom.xml @@ -43,6 +43,18 @@ org.bouncycastle bcprov-jdk18on + + com.exceptionfactory.jagged + jagged-x25519 + + + com.exceptionfactory.jagged + jagged-framework + + + com.exceptionfactory.jagged + jagged-api + org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/DecryptContentAge.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/DecryptContentAge.java new file mode 100644 index 0000000000..833e4a47c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/DecryptContentAge.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher; + +import com.exceptionfactory.jagged.DecryptingChannelFactory; +import com.exceptionfactory.jagged.RecipientStanzaReader; +import com.exceptionfactory.jagged.framework.armor.ArmoredDecryptingChannelFactory; +import com.exceptionfactory.jagged.framework.stream.StandardDecryptingChannelFactory; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceReference; +import org.apache.nifi.components.resource.ResourceReferences; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.cipher.age.AgeKeyIndicator; +import org.apache.nifi.processors.cipher.age.AgeKeyReader; +import org.apache.nifi.processors.cipher.age.AgeKeyValidator; +import org.apache.nifi.processors.cipher.age.AgePrivateKeyReader; +import org.apache.nifi.processors.cipher.age.AgeProviderResolver; +import org.apache.nifi.processors.cipher.age.KeySource; +import org.apache.nifi.processors.cipher.io.ChannelStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.security.Provider; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"age", "age-encryption.org", "encryption", "ChaCha20-Poly1305", "X25519"}) +@CapabilityDescription( + "Decrypt content using the age-encryption.org/v1 specification. " + + "Detects binary or ASCII armored content encoding using the initial file header bytes. " + + "The age standard uses ChaCha20-Poly1305 for authenticated encryption of the payload. " + + "The age-keygen command supports generating X25519 key pairs for encryption and decryption operations." +) +@SeeAlso({ EncryptContentAge.class }) +public class DecryptContentAge extends AbstractProcessor implements VerifiableProcessor { + + static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Decryption Completed") + .build(); + + static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("Decryption Failed") + .build(); + + static final PropertyDescriptor PRIVATE_KEY_SOURCE = new PropertyDescriptor.Builder() + .name("Private Key Source") + .displayName("Private Key Source") + .description("Source of information determines the loading strategy for X25519 Private Key Identities") + .required(true) + .defaultValue(KeySource.PROPERTIES.getValue()) + .allowableValues(KeySource.class) + .build(); + + static final PropertyDescriptor PRIVATE_KEY_IDENTITIES = new PropertyDescriptor.Builder() + .name("Private Key Identities") + .displayName("Private Key Identities") + .description("One or more X25519 Private Key Identities, separated with newlines, encoded according to the age specification, starting with AGE-SECRET-KEY-1") + .required(true) + .sensitive(true) + .addValidator(new AgeKeyValidator(AgeKeyIndicator.PRIVATE_KEY)) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.TEXT) + .dependsOn(PRIVATE_KEY_SOURCE, KeySource.PROPERTIES) + .build(); + + static final PropertyDescriptor PRIVATE_KEY_IDENTITY_RESOURCES = new PropertyDescriptor.Builder() + .name("Private Key Identity Resources") + .displayName("Private Key Identity Resources") + .description("One or more files or URLs containing X25519 Private Key Identities, separated with newlines, encoded according to the age specification, starting with AGE-SECRET-KEY-1") + .required(true) + .addValidator(new AgeKeyValidator(AgeKeyIndicator.PRIVATE_KEY)) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.URL) + .dependsOn(PRIVATE_KEY_SOURCE, KeySource.RESOURCES) + .build(); + + private static final Set RELATIONSHIPS = new LinkedHashSet<>(Arrays.asList(SUCCESS, FAILURE)); + + private static final List DESCRIPTORS = Arrays.asList( + PRIVATE_KEY_SOURCE, + PRIVATE_KEY_IDENTITIES, + PRIVATE_KEY_IDENTITY_RESOURCES + ); + + private static final AgeKeyReader PRIVATE_KEY_READER = new AgePrivateKeyReader(); + + private static final Provider CIPHER_PROVIDER = AgeProviderResolver.getCipherProvider(); + + /** 64 Kilobyte buffer plus 16 bytes for authentication tag aligns with age-encryption payload chunk sizing */ + private static final int BUFFER_CAPACITY = 65552; + + /** age-encryption.org version indicator at the beginning of binary encoded files */ + private static final String VERSION_INDICATOR = "age-encryption.org"; + + private static final byte[] BINARY_VERSION_INDICATOR = VERSION_INDICATOR.getBytes(StandardCharsets.US_ASCII); + + private static final int INPUT_BUFFER_SIZE = BINARY_VERSION_INDICATOR.length; + + private static final String KEY_VERIFICATION_STEP = "Verify Private Key Identities"; + + private static final String NOT_FOUND_EXPLANATION = "Private Key Identities not found"; + + private volatile List configuredRecipientStanzaReaders = Collections.emptyList(); + + /** + * Get Relationships + * + * @return Processor Relationships + */ + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + /** + * Get Supported Property Descriptors + * + * @return Processor Supported Property Descriptors + */ + @Override + public final List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * Verify Private Key Identities + * + * @param context Process Context with configured properties + * @param verificationLogger Logger for writing verification results + * @param attributes Sample FlowFile attributes for property value resolution + * @return Configuration Verification Results + */ + @Override + public List verify(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes) { + final List results = new ArrayList<>(); + + final ConfigVerificationResult.Builder verificationBuilder = new ConfigVerificationResult.Builder() + .verificationStepName(KEY_VERIFICATION_STEP); + + try { + final List recipientStanzaReaders = getRecipientStanzaReaders(context); + + if (recipientStanzaReaders.isEmpty()) { + verificationLogger.warn(NOT_FOUND_EXPLANATION); + verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(NOT_FOUND_EXPLANATION); + } else { + final String explanation = String.format("Private Key Identities found: %d", recipientStanzaReaders.size()); + verificationLogger.info(explanation); + verificationBuilder.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(explanation); + } + } catch (final Exception e) { + final String explanation = String.format("%s: %s", NOT_FOUND_EXPLANATION, e); + verificationLogger.warn(NOT_FOUND_EXPLANATION, e); + + verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(explanation); + } + + results.add(verificationBuilder.build()); + return results; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + configuredRecipientStanzaReaders = getRecipientStanzaReaders(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + final StreamCallback streamCallback = new DecryptingStreamCallback(configuredRecipientStanzaReaders); + flowFile = session.write(flowFile, streamCallback); + + session.transfer(flowFile, SUCCESS); + } catch (final Exception e) { + getLogger().error("Decryption Failed {}", flowFile, e); + session.transfer(flowFile, FAILURE); + } + } + + private List getRecipientStanzaReaders(final PropertyContext context) throws IOException { + final KeySource keySource = KeySource.valueOf(context.getProperty(PRIVATE_KEY_SOURCE).getValue()); + + final List resources = new ArrayList<>(); + + if (KeySource.PROPERTIES == keySource) { + final ResourceReference resource = context.getProperty(PRIVATE_KEY_IDENTITIES).asResource(); + resources.add(resource); + } else { + final ResourceReferences resourceReferences = context.getProperty(PRIVATE_KEY_IDENTITY_RESOURCES).asResources(); + resources.addAll(resourceReferences.asList()); + } + + final List recipientStanzaReaders = new ArrayList<>(); + + for (final ResourceReference resource : resources) { + try (final InputStream inputStream = resource.read()) { + final List readers = PRIVATE_KEY_READER.read(inputStream); + recipientStanzaReaders.addAll(readers); + } + } + + if (recipientStanzaReaders.isEmpty()) { + throw new IOException(NOT_FOUND_EXPLANATION); + } + + return recipientStanzaReaders; + } + + private static class DecryptingStreamCallback extends ChannelStreamCallback { + private final List recipientStanzaReaders; + + private DecryptingStreamCallback(final List recipientStanzaReaders) { + super(BUFFER_CAPACITY); + this.recipientStanzaReaders = recipientStanzaReaders; + } + + @Override + protected ReadableByteChannel getReadableChannel(final InputStream inputStream) throws IOException { + final PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, INPUT_BUFFER_SIZE); + final byte[] versionIndicator = getVersionIndicator(pushbackInputStream); + + final DecryptingChannelFactory decryptingChannelFactory = getDecryptingChannelFactory(versionIndicator); + try { + final ReadableByteChannel inputChannel = super.getReadableChannel(pushbackInputStream); + return decryptingChannelFactory.newDecryptingChannel(inputChannel, recipientStanzaReaders); + } catch (final GeneralSecurityException e) { + throw new IOException("Channel initialization failed", e); + } + } + + private byte[] getVersionIndicator(final PushbackInputStream pushbackInputStream) throws IOException { + final byte[] versionIndicator = new byte[INPUT_BUFFER_SIZE]; + StreamUtils.fillBuffer(pushbackInputStream, versionIndicator); + pushbackInputStream.unread(versionIndicator); + return versionIndicator; + } + + private DecryptingChannelFactory getDecryptingChannelFactory(final byte[] versionIndicator) { + final DecryptingChannelFactory decryptingChannelFactory; + + if (Arrays.equals(BINARY_VERSION_INDICATOR, versionIndicator)) { + decryptingChannelFactory = new StandardDecryptingChannelFactory(CIPHER_PROVIDER); + } else { + decryptingChannelFactory = new ArmoredDecryptingChannelFactory(CIPHER_PROVIDER); + } + + return decryptingChannelFactory; + } + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/EncryptContentAge.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/EncryptContentAge.java new file mode 100644 index 0000000000..0d37127e50 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/EncryptContentAge.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher; + +import com.exceptionfactory.jagged.EncryptingChannelFactory; +import com.exceptionfactory.jagged.RecipientStanzaWriter; +import com.exceptionfactory.jagged.framework.armor.ArmoredEncryptingChannelFactory; +import com.exceptionfactory.jagged.framework.stream.StandardEncryptingChannelFactory; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceReference; +import org.apache.nifi.components.resource.ResourceReferences; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.cipher.age.AgeKeyIndicator; +import org.apache.nifi.processors.cipher.age.AgeKeyReader; +import org.apache.nifi.processors.cipher.age.AgeKeyValidator; +import org.apache.nifi.processors.cipher.age.AgePublicKeyReader; +import org.apache.nifi.processors.cipher.age.KeySource; +import org.apache.nifi.processors.cipher.io.ChannelStreamCallback; +import org.apache.nifi.processors.cipher.age.FileEncoding; +import org.apache.nifi.processors.cipher.age.AgeProviderResolver; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.WritableByteChannel; +import java.security.GeneralSecurityException; +import java.security.Provider; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"age", "age-encryption.org", "encryption", "ChaCha20-Poly1305", "X25519"}) +@CapabilityDescription( + "Encrypt content using the age-encryption.org/v1 specification. " + + "Supports binary or ASCII armored content encoding using configurable properties. " + + "The age standard uses ChaCha20-Poly1305 for authenticated encryption of the payload. " + + "The age-keygen command supports generating X25519 key pairs for encryption and decryption operations." +) +@SeeAlso({ DecryptContentAge.class }) +public class EncryptContentAge extends AbstractProcessor implements VerifiableProcessor { + + static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Encryption Completed") + .build(); + + static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("Encryption Failed") + .build(); + + static final PropertyDescriptor FILE_ENCODING = new PropertyDescriptor.Builder() + .name("File Encoding") + .displayName("File Encoding") + .description("Output encoding for encrypted files. Binary encoding provides optimal processing performance.") + .required(true) + .defaultValue(FileEncoding.BINARY.getValue()) + .allowableValues(FileEncoding.class) + .build(); + + static final PropertyDescriptor PUBLIC_KEY_SOURCE = new PropertyDescriptor.Builder() + .name("Public Key Source") + .displayName("Public Key Source") + .description("Source of information determines the loading strategy for X25519 Public Key Recipients") + .required(true) + .defaultValue(KeySource.PROPERTIES.getValue()) + .allowableValues(KeySource.class) + .build(); + + static final PropertyDescriptor PUBLIC_KEY_RECIPIENTS = new PropertyDescriptor.Builder() + .name("Public Key Recipients") + .displayName("Public Key Recipients") + .description("One or more X25519 Public Key Recipients, separated with newlines, encoded according to the age specification, starting with age1") + .required(true) + .sensitive(true) + .addValidator(new AgeKeyValidator(AgeKeyIndicator.PUBLIC_KEY)) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.TEXT) + .dependsOn(PUBLIC_KEY_SOURCE, KeySource.PROPERTIES) + .build(); + + static final PropertyDescriptor PUBLIC_KEY_RECIPIENT_RESOURCES = new PropertyDescriptor.Builder() + .name("Public Key Recipient Resources") + .displayName("Public Key Recipient Resources") + .description("One or more files or URLs containing X25519 Public Key Recipients, separated with newlines, encoded according to the age specification, starting with age1") + .required(true) + .addValidator(new AgeKeyValidator(AgeKeyIndicator.PUBLIC_KEY)) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.URL) + .dependsOn(PUBLIC_KEY_SOURCE, KeySource.RESOURCES) + .build(); + + private static final Set RELATIONSHIPS = new LinkedHashSet<>(Arrays.asList(SUCCESS, FAILURE)); + + private static final List DESCRIPTORS = Arrays.asList( + FILE_ENCODING, + PUBLIC_KEY_SOURCE, + PUBLIC_KEY_RECIPIENTS, + PUBLIC_KEY_RECIPIENT_RESOURCES + ); + + private static final Provider CIPHER_PROVIDER = AgeProviderResolver.getCipherProvider(); + + private static final AgeKeyReader PUBLIC_KEY_READER = new AgePublicKeyReader(); + + /** 64 Kilobyte buffer aligns with age-encryption payload chunk sizing */ + private static final int BUFFER_CAPACITY = 65535; + + private static final String KEY_VERIFICATION_STEP = "Verify Public Key Recipients"; + + private static final String NOT_FOUND_EXPLANATION = "Public Key Recipients not found"; + + private volatile List configuredRecipientStanzaWriters = Collections.emptyList(); + + /** + * Get Relationships + * + * @return Processor Relationships + */ + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + /** + * Get Supported Property Descriptors + * + * @return Processor Supported Property Descriptors + */ + @Override + public final List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * Verify Public Key Identities + * + * @param context Process Context with configured properties + * @param verificationLogger Logger for writing verification results + * @param attributes Sample FlowFile attributes for property value resolution + * @return Configuration Verification Results + */ + @Override + public List verify(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes) { + final List results = new ArrayList<>(); + + final ConfigVerificationResult.Builder verificationBuilder = new ConfigVerificationResult.Builder() + .verificationStepName(KEY_VERIFICATION_STEP); + + try { + final List recipientStanzaWriters = getRecipientStanzaWriters(context); + + if (recipientStanzaWriters.isEmpty()) { + verificationLogger.warn(NOT_FOUND_EXPLANATION); + verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(NOT_FOUND_EXPLANATION); + } else { + final String explanation = String.format("Public Key Recipients found: %d", recipientStanzaWriters.size()); + verificationLogger.info(explanation); + verificationBuilder.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(explanation); + } + } catch (final Exception e) { + final String explanation = String.format("Public Key Recipients not found: %s", e.getMessage()); + verificationLogger.warn("Public Key Recipients not found", e); + + verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(explanation); + } + + results.add(verificationBuilder.build()); + return results; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + configuredRecipientStanzaWriters = getRecipientStanzaWriters(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + final String fileEncodingValue = context.getProperty(FILE_ENCODING).getValue(); + final FileEncoding fileEncoding = FileEncoding.valueOf(fileEncodingValue); + final EncryptingChannelFactory encryptingChannelFactory = getEncryptingChannelFactory(fileEncoding); + final StreamCallback streamCallback = new EncryptingStreamCallback(configuredRecipientStanzaWriters, encryptingChannelFactory); + flowFile = session.write(flowFile, streamCallback); + + session.transfer(flowFile, SUCCESS); + } catch (final Exception e) { + getLogger().error("Encryption Failed {}", flowFile, e); + session.transfer(flowFile, FAILURE); + } + } + + private EncryptingChannelFactory getEncryptingChannelFactory(final FileEncoding fileEncoding) { + final EncryptingChannelFactory encryptingChannelFactory; + + if (FileEncoding.ASCII == fileEncoding) { + encryptingChannelFactory = new ArmoredEncryptingChannelFactory(CIPHER_PROVIDER); + } else { + encryptingChannelFactory = new StandardEncryptingChannelFactory(CIPHER_PROVIDER); + } + + return encryptingChannelFactory; + } + + private List getRecipientStanzaWriters(final PropertyContext context) throws IOException { + final KeySource keySource = KeySource.valueOf(context.getProperty(PUBLIC_KEY_SOURCE).getValue()); + + final List resources = new ArrayList<>(); + + if (KeySource.PROPERTIES == keySource) { + final ResourceReference resource = context.getProperty(PUBLIC_KEY_RECIPIENTS).asResource(); + resources.add(resource); + } else { + final ResourceReferences resourceReferences = context.getProperty(PUBLIC_KEY_RECIPIENT_RESOURCES).asResources(); + resources.addAll(resourceReferences.asList()); + } + + final List recipientStanzaWriters = new ArrayList<>(); + + for (final ResourceReference resource : resources) { + try (final InputStream inputStream = resource.read()) { + final List writers = PUBLIC_KEY_READER.read(inputStream); + recipientStanzaWriters.addAll(writers); + } + } + + if (recipientStanzaWriters.isEmpty()) { + throw new IOException(NOT_FOUND_EXPLANATION); + } + + return recipientStanzaWriters; + } + + private static class EncryptingStreamCallback extends ChannelStreamCallback { + private final List recipientStanzaWriters; + + private final EncryptingChannelFactory encryptingChannelFactory; + + private EncryptingStreamCallback( + final List recipientStanzaWriters, + final EncryptingChannelFactory encryptingChannelFactory + ) { + super(BUFFER_CAPACITY); + this.recipientStanzaWriters = recipientStanzaWriters; + this.encryptingChannelFactory = encryptingChannelFactory; + } + + @Override + protected WritableByteChannel getWritableChannel(final OutputStream outputStream) throws IOException { + try { + final WritableByteChannel outputChannel = super.getWritableChannel(outputStream); + return encryptingChannelFactory.newEncryptingChannel(outputChannel, recipientStanzaWriters); + } catch (final GeneralSecurityException e) { + throw new IOException("Channel initialization failed", e); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AbstractAgeKeyReader.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AbstractAgeKeyReader.java new file mode 100644 index 0000000000..ec5c0ec23b --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AbstractAgeKeyReader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +/** + * Shared key reader abstraction for age-encryption.org public and private keys using configurable prefix and pattern + * + * @param Type of parsed key + */ +abstract class AbstractAgeKeyReader implements AgeKeyReader { + + private final AgeKeyIndicator ageKeyIndicator; + + /** + * Key Reader with prefix for initial line filtering and pattern for subsequent matching + * + * @param ageKeyIndicator Key Indicator + */ + AbstractAgeKeyReader(final AgeKeyIndicator ageKeyIndicator) { + this.ageKeyIndicator = ageKeyIndicator; + } + + /** + * Read keys from Input Stream based on lines starting with indicated prefix and matched against configured pattern + * + * @param inputStream Input Stream to be parsed + * @return Parsed key objects + * @throws IOException Thrown on failures reading Input Stream + */ + @Override + public List read(final InputStream inputStream) throws IOException { + Objects.requireNonNull(inputStream, "Input Stream required"); + + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + final Set keys = reader.lines() + .filter(line -> line.startsWith(ageKeyIndicator.getPrefix())) + .map(ageKeyIndicator.getPattern()::matcher) + .filter(Matcher::matches) + .map(Matcher::group) + .collect(Collectors.toSet()); + + return readKeys(keys); + } + } + + /** + * Read encoded keys filtered from Input Stream based on matched pattern + * + * @param keys Encoded keys + * @return Parsed key objects + * @throws IOException Thrown on failures reading encoded keys + */ + protected abstract List readKeys(Set keys) throws IOException; +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyIndicator.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyIndicator.java new file mode 100644 index 0000000000..0ab6b3baca --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyIndicator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import java.util.regex.Pattern; + +/** + * Pattern indicators for age-encryption.org public and private keys + */ +public enum AgeKeyIndicator { + /** Human-Readable Part with Separator and enumeration of allowed Bech32 uppercase characters from BIP 0173 */ + PRIVATE_KEY("AGE-SECRET-KEY-1", Pattern.compile("^AGE-SECRET-KEY-1[QPZRY9X8GF2TVDW0S3JN54KHCE6MUA7L]{58}$")), + + /** Human-Readable Part with Separator and enumeration of allowed Bech32 lowercase characters from BIP 0173 */ + PUBLIC_KEY("age1", Pattern.compile("^age1[qpzry9x8gf2tvdw0s3jn54khce6mua7l]{58}$")); + + private final String prefix; + + private final Pattern pattern; + + AgeKeyIndicator(final String prefix, final Pattern pattern) { + this.prefix = prefix; + this.pattern = pattern; + } + + public String getPrefix() { + return prefix; + } + + public Pattern getPattern() { + return pattern; + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyReader.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyReader.java new file mode 100644 index 0000000000..b4e7070588 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +/** + * Abstraction for reading public or private keys encoded using Bech32 supporting age-encryption.org + * + * @param Key Type + */ +public interface AgeKeyReader { + /** + * Read Input Stream and return collection of parsed objects corresponding to parsed keys + * + * @param inputStream Input Stream to be parsed + * @return Collection of parsed objects + * @throws IOException Thrown on failure to read Input Stream + */ + List read(InputStream inputStream) throws IOException; +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyValidator.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyValidator.java new file mode 100644 index 0000000000..b4f25f7946 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeKeyValidator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.resource.ResourceReference; +import org.apache.nifi.components.resource.ResourceReferences; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.components.resource.StandardResourceReferences; +import org.apache.nifi.components.resource.Utf8TextResource; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +/** + * Component Property Validator for age-encryption X25519 keys encoded using Bech32 + */ +public class AgeKeyValidator implements Validator { + private static final String FILE_RESOURCE_EXPLANATION = "File resource validation passed"; + + private static final String RESOURCE_EXCEPTION = "Read failed: %s"; + + private static final String NOT_FOUND_EXPLANATION = "Encoded keys not found"; + + private static final String INVALID_EXPLANATION = "Invalid keys found [%d]"; + + private static final String VALID_EXPLANATION = "Valid keys found"; + + private static final String EXPLANATION_SEPARATOR = " and "; + + private final AgeKeyIndicator ageKeyIndicator; + /** + * Key Validator with prefix for initial line filtering and pattern for subsequent matching + * + * @param ageKeyIndicator Key Indicator + */ + public AgeKeyValidator(final AgeKeyIndicator ageKeyIndicator) { + this.ageKeyIndicator = ageKeyIndicator; + } + + /** + * Validate property reads one or more lines from a newline-delimited string and requires at least one valid key + * + * @param subject Property to be validated + * @param input Property Value to be validated + * @param context Validation Context for property resolution + * @return Validation Result based on finding at least one valid key + */ + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final PropertyValue propertyValue = getPropertyValue(subject, context); + ResourceReferences resources = propertyValue.asResources(); + + if (resources == null) { + final ResourceReference resourceReference = new Utf8TextResource(input); + resources = new StandardResourceReferences(Collections.singletonList(resourceReference)); + } + + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject); + + final Set results = resources.asList().stream().map(this::validateResource).collect(Collectors.toSet()); + final String invalidExplanation = results.stream() + .filter(Predicate.not(ValidationResult::isValid)) + .map(ValidationResult::getExplanation) + .collect(Collectors.joining(EXPLANATION_SEPARATOR)); + + if (invalidExplanation.isEmpty()) { + builder.explanation(VALID_EXPLANATION).valid(true); + } else { + builder.explanation(invalidExplanation).valid(false); + } + + return builder.build(); + } + + private PropertyValue getPropertyValue(final String subject, final ValidationContext context) { + final Optional propertyFound = context.getProperties() + .keySet() + .stream() + .filter(s -> s.getName().contentEquals(subject)) + .findFirst(); + + final String message = String.format("Property [%s] not found", subject); + final PropertyDescriptor propertyDescriptor = propertyFound.orElseThrow(() -> new IllegalArgumentException(message)); + return context.getProperty(propertyDescriptor); + } + + private ValidationResult validateResource(final ResourceReference resource) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + + final ResourceType resourceType = resource.getResourceType(); + if (ResourceType.TEXT == resourceType) { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resource.read()))) { + final Set prefixedLines = reader.lines() + .filter(line -> line.startsWith(ageKeyIndicator.getPrefix())) + .collect(Collectors.toSet()); + + final long invalid = prefixedLines.stream() + .map(ageKeyIndicator.getPattern()::matcher) + .filter(Predicate.not(Matcher::matches)) + .count(); + + if (prefixedLines.size() == 0) { + builder.explanation(NOT_FOUND_EXPLANATION).valid(false); + } else if (invalid == 0) { + builder.explanation(VALID_EXPLANATION).valid(true); + } else { + final String explanation = String.format(INVALID_EXPLANATION, invalid); + builder.explanation(explanation).valid(false); + } + } catch (final Exception e) { + final String explanation = String.format(RESOURCE_EXCEPTION, e.getMessage()); + builder.explanation(explanation).valid(false); + } + } else { + builder.explanation(FILE_RESOURCE_EXPLANATION).valid(true); + } + + return builder.build(); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReader.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReader.java new file mode 100644 index 0000000000..a89b6ac1c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import com.exceptionfactory.jagged.RecipientStanzaReader; +import com.exceptionfactory.jagged.x25519.X25519RecipientStanzaReaderFactory; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.Provider; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * X25519 Private Key implementation age Key Reader + */ +public class AgePrivateKeyReader extends AbstractAgeKeyReader { + private static final Provider KEY_PROVIDER = AgeProviderResolver.getKeyProvider().orElse(null); + + public AgePrivateKeyReader() { + super(AgeKeyIndicator.PRIVATE_KEY); + } + + /** + * Read private keys and return Recipient Stanza Writers + * + * @param keys Set of private keys + * @return Recipient Stanza Writers + * @throws IOException Thrown on failures parsing private keys + */ + @Override + protected List readKeys(final Set keys) throws IOException { + final List recipientStanzaReaders = new ArrayList<>(); + for (final String encodedPrivateKey : keys) { + try { + final RecipientStanzaReader recipientStanzaReader = getRecipientStanzaReader(encodedPrivateKey); + recipientStanzaReaders.add(recipientStanzaReader); + } catch (final Exception e) { + throw new IOException("Parsing Private Key Identities failed", e); + } + } + + return recipientStanzaReaders; + } + + private RecipientStanzaReader getRecipientStanzaReader(final String encodedPrivateKey) throws GeneralSecurityException { + final RecipientStanzaReader recipientStanzaReader; + + if (KEY_PROVIDER == null) { + recipientStanzaReader = X25519RecipientStanzaReaderFactory.newRecipientStanzaReader(encodedPrivateKey); + } else { + recipientStanzaReader = X25519RecipientStanzaReaderFactory.newRecipientStanzaReader(encodedPrivateKey, KEY_PROVIDER); + } + + return recipientStanzaReader; + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeProviderResolver.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeProviderResolver.java new file mode 100644 index 0000000000..c7ca0bb27f --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgeProviderResolver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import org.bouncycastle.jce.provider.BouncyCastleProvider; + +import java.security.Provider; +import java.security.Security; +import java.util.Optional; + +/** + * Resolver abstraction for age-encryption Security Algorithm Providers + */ +public final class AgeProviderResolver { + private static final String KEY_ALGORITHM_FILTER = "KeyFactory.X25519"; + + private static final String CIPHER_ALGORITHM_FILTER = "Cipher.ChaCha20-Poly1305"; + + private AgeProviderResolver() { + + } + + /** + * Get Java Security Provider supporting ChaCha20-Poly1305 + * + * @return Provider supporting ChaCha20-Poly1305 Cipher that defaults to Bouncy Castle when no other Providers found + */ + public static Provider getCipherProvider() { + final Provider cipherProvider; + + final Provider[] providers = Security.getProviders(CIPHER_ALGORITHM_FILTER); + if (providers == null) { + cipherProvider = new BouncyCastleProvider(); + } else { + cipherProvider = providers[0]; + } + + return cipherProvider; + } + + /** + * Get available Java Security Provider supporting X25519 and ChaCha20-Poly1305 when registered Provider not found + * + * @return Empty Provider indicating platform support for X25519 Key Factory or Bouncy Castle when no other Providers found + */ + public static Optional getKeyProvider() { + final Provider keyProvider; + + final Provider[] providers = Security.getProviders(KEY_ALGORITHM_FILTER); + if (providers == null) { + keyProvider = new BouncyCastleProvider(); + } else { + keyProvider = null; + } + + return Optional.ofNullable(keyProvider); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReader.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReader.java new file mode 100644 index 0000000000..ac0694d680 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import com.exceptionfactory.jagged.RecipientStanzaWriter; +import com.exceptionfactory.jagged.x25519.X25519RecipientStanzaWriterFactory; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.Provider; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * X25519 Public Key implementation age Key Reader + */ +public class AgePublicKeyReader extends AbstractAgeKeyReader { + private static final Provider KEY_PROVIDER = AgeProviderResolver.getKeyProvider().orElse(null); + + public AgePublicKeyReader() { + super(AgeKeyIndicator.PUBLIC_KEY); + } + + /** + * Read public keys and return Recipient Stanza Writers + * + * @param keys Set of public keys + * @return Recipient Stanza Writers + * @throws IOException Thrown on failure to parse public keys + */ + @Override + protected List readKeys(final Set keys) throws IOException { + final List recipientStanzaWriters = new ArrayList<>(); + for (final String encodedPublicKey : keys) { + try { + final RecipientStanzaWriter recipientStanzaWriter = getRecipientStanzaWriter(encodedPublicKey); + recipientStanzaWriters.add(recipientStanzaWriter); + } catch (final Exception e) { + throw new IOException("Parsing Public Key Recipients failed", e); + } + } + + return recipientStanzaWriters; + } + + private RecipientStanzaWriter getRecipientStanzaWriter(final String encodedPublicKey) throws GeneralSecurityException { + final RecipientStanzaWriter recipientStanzaWriter; + + if (KEY_PROVIDER == null) { + recipientStanzaWriter = X25519RecipientStanzaWriterFactory.newRecipientStanzaWriter(encodedPublicKey); + } else { + recipientStanzaWriter = X25519RecipientStanzaWriterFactory.newRecipientStanzaWriter(encodedPublicKey, KEY_PROVIDER); + } + + return recipientStanzaWriter; + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/FileEncoding.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/FileEncoding.java new file mode 100644 index 0000000000..5be8ef94ed --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/FileEncoding.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import org.apache.nifi.components.DescribedValue; + +/** + * File Encoding options supporting Binary or ASCII Armor + */ +public enum FileEncoding implements DescribedValue { + BINARY("Binary encoding"), + + ASCII("ASCII Armor encoding"); + + private final String description; + + FileEncoding(final String description) { + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/KeySource.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/KeySource.java new file mode 100644 index 0000000000..8b5e762f2b --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/age/KeySource.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import org.apache.nifi.components.DescribedValue; + +/** + * Key resource loading strategy + */ +public enum KeySource implements DescribedValue { + PROPERTIES("Load one or more keys from configured properties"), + + RESOURCES("Load one or more keys from files or URLs"); + + private final String description; + + KeySource(final String description) { + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallback.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallback.java new file mode 100644 index 0000000000..a43fd6d710 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallback.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.io; + +import org.apache.nifi.processor.io.StreamCallback; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +/** + * Channel Stream Callback wraps Java IO Streams in Java NIO Channels with methods for customization of channels + */ +public class ChannelStreamCallback implements StreamCallback { + private static final int END_OF_FILE = -1; + + private final int bufferCapacity; + + /** + * Channel Stream Callback with configurable buffer capacity for transferring bytes + * + * @param bufferCapacity Buffer capacity in bytes for transferring between channels + */ + public ChannelStreamCallback(final int bufferCapacity) { + this.bufferCapacity = bufferCapacity; + } + + /** + * Process streams using NIO Channels with configured Buffer + * + * @param inputStream Input Stream to be wrapped using a Readable Byte Channel + * @param outputStream Output Stream to be wrapped using a Writable Byte Channel + * @throws IOException Thrown on read or write failures + */ + @Override + public void process(final InputStream inputStream, final OutputStream outputStream) throws IOException { + try ( + final ReadableByteChannel readableByteChannel = getReadableChannel(inputStream); + final WritableByteChannel outputChannel = getWritableChannel(outputStream) + ) { + final ByteBuffer buffer = ByteBuffer.allocate(bufferCapacity); + while (readableByteChannel.read(buffer) != END_OF_FILE) { + buffer.flip(); + while (buffer.hasRemaining()) { + outputChannel.write(buffer); + } + buffer.clear(); + } + + buffer.flip(); + if (buffer.hasRemaining()) { + while (buffer.hasRemaining()) { + outputChannel.write(buffer); + } + } + } + } + + /** + * Get Readable Byte Channel defaults to using Channels.newChannel() + * + * @param inputStream Input Stream to be wrapped + * @return Readable Byte Channel wrapping provided Input Stream + * @throws IOException Throw on channel initialization failures + */ + protected ReadableByteChannel getReadableChannel(final InputStream inputStream) throws IOException { + return Channels.newChannel(inputStream); + } + + /** + * Get Writable Byte Channel defaults to using Channels.newChannel() + * + * @param outputStream Output Stream to be wrapped + * @return Writable Byte Channel wrapping provided Output Stream + * @throws IOException Thrown on channel initialization failures + */ + protected WritableByteChannel getWritableChannel(final OutputStream outputStream) throws IOException { + return Channels.newChannel(outputStream); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9952db3a9f..b3c1c3772c 100644 --- a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,4 +14,6 @@ # limitations under the License. org.apache.nifi.processors.cipher.DecryptContentCompatibility org.apache.nifi.processors.cipher.DecryptContent +org.apache.nifi.processors.cipher.DecryptContentAge +org.apache.nifi.processors.cipher.EncryptContentAge org.apache.nifi.processors.cipher.VerifyContentMAC diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/DecryptContentAgeTest.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/DecryptContentAgeTest.java new file mode 100644 index 0000000000..805c136f0b --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/DecryptContentAgeTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher; + +import com.exceptionfactory.jagged.EncryptingChannelFactory; +import com.exceptionfactory.jagged.RecipientStanzaWriter; +import com.exceptionfactory.jagged.framework.armor.ArmoredEncryptingChannelFactory; +import com.exceptionfactory.jagged.framework.stream.StandardEncryptingChannelFactory; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processors.cipher.age.AgeProviderResolver; +import org.apache.nifi.processors.cipher.age.AgePublicKeyReader; +import org.apache.nifi.processors.cipher.age.FileEncoding; +import org.apache.nifi.processors.cipher.age.KeySource; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.opentest4j.AssertionFailedError; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.GeneralSecurityException; +import java.security.Provider; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DecryptContentAgeTest { + + private static final byte[] WORD = {'W', 'O', 'R', 'D'}; + + private static final String CONTENT_WORD = new String(WORD); + + private static final String PRIVATE_KEY_CHECKSUM_INVALID = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFFF"; + + private static final String PRIVATE_KEY_ENCODED = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFTN"; + + private static final String PRIVATE_KEY_SECOND = "AGE-SECRET-KEY-1AU0T8M9GWJ4PEQK9TGS54T6VHRL8DLFTZ7AWYJDFTLMDZZWZQKDSA8K882"; + + private static final String PUBLIC_KEY_ENCODED = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn5gg"; + + private static final String KEY_FILE_SUFFIX = ".key"; + + private TestRunner runner; + + @BeforeEach + void setRunner() { + runner = TestRunners.newTestRunner(DecryptContentAge.class); + } + + @Test + void testRequiredProperties() { + runner.assertNotValid(); + + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED); + + runner.assertValid(); + } + + @Test + void testPrivateKeyNotValid() { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PUBLIC_KEY_ENCODED); + + runner.assertNotValid(); + } + + @Test + void testVerifySuccessful() { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.SUCCESSFUL); + } + + @Test + void testVerifyFailedChecksumInvalid() { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_CHECKSUM_INVALID); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED); + } + + @Test + void testVerifyFailedResourcesNotConfigured() { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.RESOURCES.getValue()); + + assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED); + } + + @Test + void testRunSuccessAscii() throws GeneralSecurityException, IOException { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertSuccess(FileEncoding.ASCII); + } + + @Test + void testRunSuccessBinary() throws GeneralSecurityException, IOException { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertSuccess(FileEncoding.BINARY); + } + + @Test + void testRunSuccessBinaryMultiplePrivateKeys() throws GeneralSecurityException, IOException { + final String multiplePrivateKeys = String.format("%s%n%s", PRIVATE_KEY_ENCODED, PRIVATE_KEY_SECOND); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, multiplePrivateKeys); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertSuccess(FileEncoding.BINARY); + } + + @Test + void testRunSuccessBinaryKeySourceResources(@TempDir final Path tempDir) throws GeneralSecurityException, IOException { + final Path tempFile = createTempFile(tempDir); + Files.writeString(tempFile, PRIVATE_KEY_ENCODED); + + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITY_RESOURCES, tempFile.toString()); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.RESOURCES.getValue()); + + assertSuccess(FileEncoding.BINARY); + } + + @Test + void testRunScheduledErrorPrivateKeyNotFound(@TempDir final Path tempDir) throws GeneralSecurityException, IOException { + final Path tempFile = createTempFile(tempDir); + + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITY_RESOURCES, tempFile.toString()); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.RESOURCES.getValue()); + + final byte[] encrypted = getEncrypted(FileEncoding.BINARY); + runner.enqueue(encrypted); + + assertThrows(AssertionFailedError.class, runner::run); + } + + @Test + void testRunFailure() { + runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED); + runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + runner.enqueue(WORD); + runner.run(); + + runner.assertAllFlowFilesTransferred(DecryptContentAge.FAILURE); + } + + private void assertSuccess(final FileEncoding fileEncoding) throws GeneralSecurityException, IOException { + final byte[] encrypted = getEncrypted(fileEncoding); + runner.enqueue(encrypted); + runner.run(); + + runner.assertAllFlowFilesTransferred(DecryptContentAge.SUCCESS); + + final Iterator flowFiles = runner.getFlowFilesForRelationship(DecryptContentAge.SUCCESS).iterator(); + assertTrue(flowFiles.hasNext()); + final MockFlowFile flowFile = flowFiles.next(); + + final String content = flowFile.getContent(); + assertEquals(CONTENT_WORD, content); + } + + private void assertVerificationResultOutcomeEquals(final ConfigVerificationResult.Outcome expected) { + final VerifiableProcessor processor = (VerifiableProcessor) runner.getProcessor(); + final Iterator results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()).iterator(); + + assertTrue(results.hasNext()); + final ConfigVerificationResult result = results.next(); + assertEquals(expected, result.getOutcome()); + } + + private byte[] getEncrypted(final FileEncoding fileEncoding) throws GeneralSecurityException, IOException { + final EncryptingChannelFactory encryptingChannelFactory = getEncryptingChannelFactory(fileEncoding); + final AgePublicKeyReader publicKeyReader = new AgePublicKeyReader(); + final ByteArrayInputStream encodedInputStream = new ByteArrayInputStream(PUBLIC_KEY_ENCODED.getBytes(StandardCharsets.UTF_8)); + final List recipientStanzaWriters = publicKeyReader.read(encodedInputStream); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final WritableByteChannel outputChannel = Channels.newChannel(outputStream); + try (final WritableByteChannel encryptingChannel = encryptingChannelFactory.newEncryptingChannel(outputChannel, recipientStanzaWriters)) { + final ByteBuffer buffer = ByteBuffer.wrap(WORD); + encryptingChannel.write(buffer); + } + + return outputStream.toByteArray(); + } + + private EncryptingChannelFactory getEncryptingChannelFactory(final FileEncoding fileEncoding) { + final Provider provider = AgeProviderResolver.getCipherProvider(); + final EncryptingChannelFactory encryptingChannelFactory; + if (FileEncoding.ASCII == fileEncoding) { + encryptingChannelFactory = new ArmoredEncryptingChannelFactory(provider); + } else { + encryptingChannelFactory = new StandardEncryptingChannelFactory(provider); + } + return encryptingChannelFactory; + } + + private Path createTempFile(final Path tempDir) throws IOException { + return Files.createTempFile(tempDir, DecryptContentAgeTest.class.getSimpleName(), KEY_FILE_SUFFIX); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/EncryptContentAgeTest.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/EncryptContentAgeTest.java new file mode 100644 index 0000000000..16655ce0d9 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/EncryptContentAgeTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher; + +import com.exceptionfactory.jagged.DecryptingChannelFactory; +import com.exceptionfactory.jagged.RecipientStanzaReader; +import com.exceptionfactory.jagged.framework.armor.ArmoredDecryptingChannelFactory; +import com.exceptionfactory.jagged.framework.stream.StandardDecryptingChannelFactory; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processors.cipher.age.AgePrivateKeyReader; +import org.apache.nifi.processors.cipher.age.AgeProviderResolver; +import org.apache.nifi.processors.cipher.age.FileEncoding; +import org.apache.nifi.processors.cipher.age.KeySource; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.opentest4j.AssertionFailedError; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.GeneralSecurityException; +import java.security.Provider; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class EncryptContentAgeTest { + + private static final String BINARY_VERSION = "age-encryption.org"; + + private static final String ASCII_HEADER = "-----BEGIN AGE ENCRYPTED FILE-----"; + + private static final byte[] WORD = {'W', 'O', 'R', 'D'}; + + private static final int BUFFER_CAPACITY = 20; + + private static final String PRIVATE_KEY_ENCODED = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFTN"; + + private static final String PUBLIC_KEY_ENCODED = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn5gg"; + + private static final String PUBLIC_KEY_CHECKSUM_INVALID = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn000"; + + private static final String KEY_FILE_SUFFIX = ".key"; + + private TestRunner runner; + + @BeforeEach + void setRunner() { + runner = TestRunners.newTestRunner(EncryptContentAge.class); + } + + @Test + void testRequiredProperties() { + runner.assertNotValid(); + + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED); + + runner.assertValid(); + } + + @Test + void testPublicKeyNotValid() { + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PRIVATE_KEY_ENCODED); + + runner.assertNotValid(); + } + + @Test + void testVerifySuccessful() { + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED); + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.SUCCESSFUL); + } + + @Test + void testVerifyFailedChecksumInvalid() { + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_CHECKSUM_INVALID); + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + + assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED); + } + + @Test + void testVerifyFailedResourcesNotFound() { + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.RESOURCES.getValue()); + + assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED); + } + + @Test + void testRunSuccessAscii() throws GeneralSecurityException, IOException { + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED); + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + runner.setProperty(EncryptContentAge.FILE_ENCODING, FileEncoding.ASCII.getValue()); + + runner.enqueue(WORD); + runner.run(); + + runner.assertAllFlowFilesTransferred(EncryptContentAge.SUCCESS); + assertAsciiFound(); + } + + @Test + void testRunSuccessBinary() throws GeneralSecurityException, IOException { + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED); + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue()); + runner.setProperty(EncryptContentAge.FILE_ENCODING, FileEncoding.BINARY.getValue()); + + runner.enqueue(WORD); + runner.run(); + + runner.assertAllFlowFilesTransferred(EncryptContentAge.SUCCESS); + assertBinaryFound(); + } + + @Test + void testRunSuccessBinaryKeySourceResources(@TempDir final Path tempDir) throws GeneralSecurityException, IOException { + final Path tempFile = createTempFile(tempDir); + Files.writeString(tempFile, PUBLIC_KEY_ENCODED); + + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENT_RESOURCES, tempFile.toString()); + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.RESOURCES.getValue()); + runner.setProperty(EncryptContentAge.FILE_ENCODING, FileEncoding.BINARY.getValue()); + + runner.enqueue(WORD); + runner.run(); + + runner.assertAllFlowFilesTransferred(EncryptContentAge.SUCCESS); + assertBinaryFound(); + } + + @Test + void testRunScheduledErrorPublicKeyNotFound(@TempDir final Path tempDir) throws IOException { + final Path tempFile = createTempFile(tempDir); + + runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENT_RESOURCES, tempFile.toString()); + runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.RESOURCES.getValue()); + + runner.enqueue(WORD); + + assertThrows(AssertionFailedError.class, runner::run); + } + + private void assertBinaryFound() throws GeneralSecurityException, IOException { + final Iterator flowFiles = runner.getFlowFilesForRelationship(EncryptContentAge.SUCCESS).iterator(); + + assertTrue(flowFiles.hasNext()); + + final MockFlowFile flowFile = flowFiles.next(); + final String content = flowFile.getContent(); + + assertTrue(content.startsWith(BINARY_VERSION)); + + final byte[] decrypted = getDecrypted(FileEncoding.BINARY, flowFile.getContentStream()); + assertArrayEquals(WORD, decrypted); + } + + private void assertAsciiFound() throws GeneralSecurityException, IOException { + final Iterator flowFiles = runner.getFlowFilesForRelationship(EncryptContentAge.SUCCESS).iterator(); + + assertTrue(flowFiles.hasNext()); + + final MockFlowFile flowFile = flowFiles.next(); + final String content = flowFile.getContent(); + + assertTrue(content.startsWith(ASCII_HEADER)); + + final byte[] decrypted = getDecrypted(FileEncoding.ASCII, flowFile.getContentStream()); + assertArrayEquals(WORD, decrypted); + } + + private void assertVerificationResultOutcomeEquals(final ConfigVerificationResult.Outcome expected) { + final VerifiableProcessor processor = (VerifiableProcessor) runner.getProcessor(); + final Iterator results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()).iterator(); + + assertTrue(results.hasNext()); + final ConfigVerificationResult result = results.next(); + assertEquals(expected, result.getOutcome()); + } + + private byte[] getDecrypted(final FileEncoding fileEncoding, final InputStream inputStream) throws GeneralSecurityException, IOException { + final DecryptingChannelFactory decryptingChannelFactory = getDecryptingChannelFactory(fileEncoding); + final AgePrivateKeyReader privateKeyReader = new AgePrivateKeyReader(); + final ByteArrayInputStream encodedInputStream = new ByteArrayInputStream(PRIVATE_KEY_ENCODED.getBytes(StandardCharsets.UTF_8)); + final List recipientStanzaReaders = privateKeyReader.read(encodedInputStream); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final WritableByteChannel outputChannel = Channels.newChannel(outputStream); + try (final ReadableByteChannel decryptingChannel = decryptingChannelFactory.newDecryptingChannel(Channels.newChannel(inputStream), recipientStanzaReaders)) { + final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_CAPACITY); + decryptingChannel.read(buffer); + buffer.flip(); + outputChannel.write(buffer); + } + + return outputStream.toByteArray(); + } + + private DecryptingChannelFactory getDecryptingChannelFactory(final FileEncoding fileEncoding) { + final Provider provider = AgeProviderResolver.getCipherProvider(); + final DecryptingChannelFactory decryptingChannelFactory; + if (FileEncoding.ASCII == fileEncoding) { + decryptingChannelFactory = new ArmoredDecryptingChannelFactory(provider); + } else { + decryptingChannelFactory = new StandardDecryptingChannelFactory(provider); + } + return decryptingChannelFactory; + } + + private Path createTempFile(final Path tempDir) throws IOException { + return Files.createTempFile(tempDir, EncryptContentAgeTest.class.getSimpleName(), KEY_FILE_SUFFIX); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReaderTest.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReaderTest.java new file mode 100644 index 0000000000..70acad6bf3 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePrivateKeyReaderTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import com.exceptionfactory.jagged.RecipientStanzaReader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AgePrivateKeyReaderTest { + private static final String PRIVATE_KEY_ENCODED = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFTN"; + + private static final String PRIVATE_KEY_CHECKSUM_INVALID = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFFF"; + + private AgePrivateKeyReader reader; + + @BeforeEach + void setReader() { + reader = new AgePrivateKeyReader(); + } + + @Test + void testRead() throws IOException { + final InputStream inputStream = new ByteArrayInputStream(PRIVATE_KEY_ENCODED.getBytes(StandardCharsets.UTF_8)); + + final Iterator recipientStanzaReaders = reader.read(inputStream).iterator(); + + assertTrue(recipientStanzaReaders.hasNext()); + + final RecipientStanzaReader recipientStanzaReader = recipientStanzaReaders.next(); + assertNotNull(recipientStanzaReader); + + assertFalse(recipientStanzaReaders.hasNext()); + } + + @Test + void testReadChecksumInvalid() { + final InputStream inputStream = new ByteArrayInputStream(PRIVATE_KEY_CHECKSUM_INVALID.getBytes(StandardCharsets.UTF_8)); + + assertThrows(IOException.class, () -> reader.read(inputStream)); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReaderTest.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReaderTest.java new file mode 100644 index 0000000000..c44d677b3d --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/age/AgePublicKeyReaderTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.age; + +import com.exceptionfactory.jagged.RecipientStanzaWriter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AgePublicKeyReaderTest { + private static final String PUBLIC_KEY_ENCODED = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn5gg"; + + private static final String PUBLIC_KEY_CHECKSUM_INVALID = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn000"; + + private AgePublicKeyReader reader; + + @BeforeEach + void setReader() { + reader = new AgePublicKeyReader(); + } + + @Test + void testRead() throws IOException { + final InputStream inputStream = new ByteArrayInputStream(PUBLIC_KEY_ENCODED.getBytes(StandardCharsets.UTF_8)); + + final Iterator recipientStanzaWriters = reader.read(inputStream).iterator(); + + assertTrue(recipientStanzaWriters.hasNext()); + + final RecipientStanzaWriter recipientStanzaWriter = recipientStanzaWriters.next(); + assertNotNull(recipientStanzaWriter); + + assertFalse(recipientStanzaWriters.hasNext()); + } + + @Test + void testReadChecksumInvalid() { + final InputStream inputStream = new ByteArrayInputStream(PUBLIC_KEY_CHECKSUM_INVALID.getBytes(StandardCharsets.UTF_8)); + + assertThrows(IOException.class, () -> reader.read(inputStream)); + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallbackTest.java b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallbackTest.java new file mode 100644 index 0000000000..0f01d7b861 --- /dev/null +++ b/nifi-nar-bundles/nifi-cipher-bundle/nifi-cipher-processors/src/test/java/org/apache/nifi/processors/cipher/io/ChannelStreamCallbackTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cipher.io; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +class ChannelStreamCallbackTest { + private static final int BUFFER_CAPACITY = 8192; + + private static final int BUFFER_CAPACITY_DOUBLED = 16384; + + private static final byte BYTE = 7; + + private static final int END_OF_FILE = -1; + + @Test + void testProcessEmpty() throws IOException { + final byte[] expected = new byte[]{}; + + assertProcessSuccess(expected); + } + + @Test + void testProcessSingleByte() throws IOException { + final byte[] expected = new byte[]{BYTE}; + + assertProcessSuccess(expected); + } + + @Test + void testProcessBufferCapacity() throws IOException { + final byte[] expected = new byte[BUFFER_CAPACITY]; + Arrays.fill(expected, BYTE); + + assertProcessSuccess(expected); + } + + @Test + void testProcessBufferCapacityDoubled() throws IOException { + final byte[] expected = new byte[BUFFER_CAPACITY_DOUBLED]; + Arrays.fill(expected, BYTE); + + assertProcessSuccess(expected); + } + + @Test + void testProcessReadBufferRemaining() throws IOException { + final ChannelStreamCallback callback = new BufferRemainingChannelStreamCallback(); + + final byte[] expected = new byte[]{BYTE}; + final ByteArrayInputStream inputStream = new ByteArrayInputStream(expected); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + callback.process(inputStream, outputStream); + + assertArrayEquals(expected, outputStream.toByteArray()); + } + + private void assertProcessSuccess(final byte[] expected) throws IOException { + final ChannelStreamCallback callback = new ChannelStreamCallback(BUFFER_CAPACITY); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(expected); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + callback.process(inputStream, outputStream); + + assertArrayEquals(expected, outputStream.toByteArray()); + } + + private static class BufferRemainingChannelStreamCallback extends ChannelStreamCallback { + + public BufferRemainingChannelStreamCallback() { + super(BUFFER_CAPACITY); + } + + protected ReadableByteChannel getReadableChannel(final InputStream inputStream) { + return new BufferRemainingReadableByteChannel(); + } + } + + private static class BufferRemainingReadableByteChannel implements ReadableByteChannel { + + @Override + public int read(final ByteBuffer buffer) { + buffer.put(BYTE); + return END_OF_FILE; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() { + + } + } +} diff --git a/nifi-nar-bundles/nifi-cipher-bundle/pom.xml b/nifi-nar-bundles/nifi-cipher-bundle/pom.xml index 5179e40b23..ebeb4b460a 100644 --- a/nifi-nar-bundles/nifi-cipher-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cipher-bundle/pom.xml @@ -29,4 +29,16 @@ nifi-cipher-processors nifi-cipher-nar + + + + + com.exceptionfactory.jagged + jagged-bom + 0.2.0 + import + pom + + +