NIFI-12033 Added EncryptContentAge and DecryptContentAge Processors

This closes #7676
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2023-09-09 09:38:54 -05:00 committed by Paul Grey
parent 9c19656f5b
commit ebe8b9a2e7
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
20 changed files with 2077 additions and 0 deletions

View File

@ -43,6 +43,18 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>com.exceptionfactory.jagged</groupId>
<artifactId>jagged-x25519</artifactId>
</dependency>
<dependency>
<groupId>com.exceptionfactory.jagged</groupId>
<artifactId>jagged-framework</artifactId>
</dependency>
<dependency>
<groupId>com.exceptionfactory.jagged</groupId>
<artifactId>jagged-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher;
import com.exceptionfactory.jagged.DecryptingChannelFactory;
import com.exceptionfactory.jagged.RecipientStanzaReader;
import com.exceptionfactory.jagged.framework.armor.ArmoredDecryptingChannelFactory;
import com.exceptionfactory.jagged.framework.stream.StandardDecryptingChannelFactory;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.cipher.age.AgeKeyIndicator;
import org.apache.nifi.processors.cipher.age.AgeKeyReader;
import org.apache.nifi.processors.cipher.age.AgeKeyValidator;
import org.apache.nifi.processors.cipher.age.AgePrivateKeyReader;
import org.apache.nifi.processors.cipher.age.AgeProviderResolver;
import org.apache.nifi.processors.cipher.age.KeySource;
import org.apache.nifi.processors.cipher.io.ChannelStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.Provider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"age", "age-encryption.org", "encryption", "ChaCha20-Poly1305", "X25519"})
@CapabilityDescription(
"Decrypt content using the age-encryption.org/v1 specification. " +
"Detects binary or ASCII armored content encoding using the initial file header bytes. " +
"The age standard uses ChaCha20-Poly1305 for authenticated encryption of the payload. " +
"The age-keygen command supports generating X25519 key pairs for encryption and decryption operations."
)
@SeeAlso({ EncryptContentAge.class })
public class DecryptContentAge extends AbstractProcessor implements VerifiableProcessor {
static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Decryption Completed")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("Decryption Failed")
.build();
static final PropertyDescriptor PRIVATE_KEY_SOURCE = new PropertyDescriptor.Builder()
.name("Private Key Source")
.displayName("Private Key Source")
.description("Source of information determines the loading strategy for X25519 Private Key Identities")
.required(true)
.defaultValue(KeySource.PROPERTIES.getValue())
.allowableValues(KeySource.class)
.build();
static final PropertyDescriptor PRIVATE_KEY_IDENTITIES = new PropertyDescriptor.Builder()
.name("Private Key Identities")
.displayName("Private Key Identities")
.description("One or more X25519 Private Key Identities, separated with newlines, encoded according to the age specification, starting with AGE-SECRET-KEY-1")
.required(true)
.sensitive(true)
.addValidator(new AgeKeyValidator(AgeKeyIndicator.PRIVATE_KEY))
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.TEXT)
.dependsOn(PRIVATE_KEY_SOURCE, KeySource.PROPERTIES)
.build();
static final PropertyDescriptor PRIVATE_KEY_IDENTITY_RESOURCES = new PropertyDescriptor.Builder()
.name("Private Key Identity Resources")
.displayName("Private Key Identity Resources")
.description("One or more files or URLs containing X25519 Private Key Identities, separated with newlines, encoded according to the age specification, starting with AGE-SECRET-KEY-1")
.required(true)
.addValidator(new AgeKeyValidator(AgeKeyIndicator.PRIVATE_KEY))
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.URL)
.dependsOn(PRIVATE_KEY_SOURCE, KeySource.RESOURCES)
.build();
private static final Set<Relationship> RELATIONSHIPS = new LinkedHashSet<>(Arrays.asList(SUCCESS, FAILURE));
private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(
PRIVATE_KEY_SOURCE,
PRIVATE_KEY_IDENTITIES,
PRIVATE_KEY_IDENTITY_RESOURCES
);
private static final AgeKeyReader<RecipientStanzaReader> PRIVATE_KEY_READER = new AgePrivateKeyReader();
private static final Provider CIPHER_PROVIDER = AgeProviderResolver.getCipherProvider();
/** 64 Kilobyte buffer plus 16 bytes for authentication tag aligns with age-encryption payload chunk sizing */
private static final int BUFFER_CAPACITY = 65552;
/** age-encryption.org version indicator at the beginning of binary encoded files */
private static final String VERSION_INDICATOR = "age-encryption.org";
private static final byte[] BINARY_VERSION_INDICATOR = VERSION_INDICATOR.getBytes(StandardCharsets.US_ASCII);
private static final int INPUT_BUFFER_SIZE = BINARY_VERSION_INDICATOR.length;
private static final String KEY_VERIFICATION_STEP = "Verify Private Key Identities";
private static final String NOT_FOUND_EXPLANATION = "Private Key Identities not found";
private volatile List<RecipientStanzaReader> configuredRecipientStanzaReaders = Collections.emptyList();
/**
* Get Relationships
*
* @return Processor Relationships
*/
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
/**
* Get Supported Property Descriptors
*
* @return Processor Supported Property Descriptors
*/
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
/**
* Verify Private Key Identities
*
* @param context Process Context with configured properties
* @param verificationLogger Logger for writing verification results
* @param attributes Sample FlowFile attributes for property value resolution
* @return Configuration Verification Results
*/
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
final ConfigVerificationResult.Builder verificationBuilder = new ConfigVerificationResult.Builder()
.verificationStepName(KEY_VERIFICATION_STEP);
try {
final List<RecipientStanzaReader> recipientStanzaReaders = getRecipientStanzaReaders(context);
if (recipientStanzaReaders.isEmpty()) {
verificationLogger.warn(NOT_FOUND_EXPLANATION);
verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(NOT_FOUND_EXPLANATION);
} else {
final String explanation = String.format("Private Key Identities found: %d", recipientStanzaReaders.size());
verificationLogger.info(explanation);
verificationBuilder.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(explanation);
}
} catch (final Exception e) {
final String explanation = String.format("%s: %s", NOT_FOUND_EXPLANATION, e);
verificationLogger.warn(NOT_FOUND_EXPLANATION, e);
verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(explanation);
}
results.add(verificationBuilder.build());
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
configuredRecipientStanzaReaders = getRecipientStanzaReaders(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
final StreamCallback streamCallback = new DecryptingStreamCallback(configuredRecipientStanzaReaders);
flowFile = session.write(flowFile, streamCallback);
session.transfer(flowFile, SUCCESS);
} catch (final Exception e) {
getLogger().error("Decryption Failed {}", flowFile, e);
session.transfer(flowFile, FAILURE);
}
}
private List<RecipientStanzaReader> getRecipientStanzaReaders(final PropertyContext context) throws IOException {
final KeySource keySource = KeySource.valueOf(context.getProperty(PRIVATE_KEY_SOURCE).getValue());
final List<ResourceReference> resources = new ArrayList<>();
if (KeySource.PROPERTIES == keySource) {
final ResourceReference resource = context.getProperty(PRIVATE_KEY_IDENTITIES).asResource();
resources.add(resource);
} else {
final ResourceReferences resourceReferences = context.getProperty(PRIVATE_KEY_IDENTITY_RESOURCES).asResources();
resources.addAll(resourceReferences.asList());
}
final List<RecipientStanzaReader> recipientStanzaReaders = new ArrayList<>();
for (final ResourceReference resource : resources) {
try (final InputStream inputStream = resource.read()) {
final List<RecipientStanzaReader> readers = PRIVATE_KEY_READER.read(inputStream);
recipientStanzaReaders.addAll(readers);
}
}
if (recipientStanzaReaders.isEmpty()) {
throw new IOException(NOT_FOUND_EXPLANATION);
}
return recipientStanzaReaders;
}
private static class DecryptingStreamCallback extends ChannelStreamCallback {
private final List<RecipientStanzaReader> recipientStanzaReaders;
private DecryptingStreamCallback(final List<RecipientStanzaReader> recipientStanzaReaders) {
super(BUFFER_CAPACITY);
this.recipientStanzaReaders = recipientStanzaReaders;
}
@Override
protected ReadableByteChannel getReadableChannel(final InputStream inputStream) throws IOException {
final PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, INPUT_BUFFER_SIZE);
final byte[] versionIndicator = getVersionIndicator(pushbackInputStream);
final DecryptingChannelFactory decryptingChannelFactory = getDecryptingChannelFactory(versionIndicator);
try {
final ReadableByteChannel inputChannel = super.getReadableChannel(pushbackInputStream);
return decryptingChannelFactory.newDecryptingChannel(inputChannel, recipientStanzaReaders);
} catch (final GeneralSecurityException e) {
throw new IOException("Channel initialization failed", e);
}
}
private byte[] getVersionIndicator(final PushbackInputStream pushbackInputStream) throws IOException {
final byte[] versionIndicator = new byte[INPUT_BUFFER_SIZE];
StreamUtils.fillBuffer(pushbackInputStream, versionIndicator);
pushbackInputStream.unread(versionIndicator);
return versionIndicator;
}
private DecryptingChannelFactory getDecryptingChannelFactory(final byte[] versionIndicator) {
final DecryptingChannelFactory decryptingChannelFactory;
if (Arrays.equals(BINARY_VERSION_INDICATOR, versionIndicator)) {
decryptingChannelFactory = new StandardDecryptingChannelFactory(CIPHER_PROVIDER);
} else {
decryptingChannelFactory = new ArmoredDecryptingChannelFactory(CIPHER_PROVIDER);
}
return decryptingChannelFactory;
}
}
}

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher;
import com.exceptionfactory.jagged.EncryptingChannelFactory;
import com.exceptionfactory.jagged.RecipientStanzaWriter;
import com.exceptionfactory.jagged.framework.armor.ArmoredEncryptingChannelFactory;
import com.exceptionfactory.jagged.framework.stream.StandardEncryptingChannelFactory;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.cipher.age.AgeKeyIndicator;
import org.apache.nifi.processors.cipher.age.AgeKeyReader;
import org.apache.nifi.processors.cipher.age.AgeKeyValidator;
import org.apache.nifi.processors.cipher.age.AgePublicKeyReader;
import org.apache.nifi.processors.cipher.age.KeySource;
import org.apache.nifi.processors.cipher.io.ChannelStreamCallback;
import org.apache.nifi.processors.cipher.age.FileEncoding;
import org.apache.nifi.processors.cipher.age.AgeProviderResolver;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
import java.security.Provider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"age", "age-encryption.org", "encryption", "ChaCha20-Poly1305", "X25519"})
@CapabilityDescription(
"Encrypt content using the age-encryption.org/v1 specification. " +
"Supports binary or ASCII armored content encoding using configurable properties. " +
"The age standard uses ChaCha20-Poly1305 for authenticated encryption of the payload. " +
"The age-keygen command supports generating X25519 key pairs for encryption and decryption operations."
)
@SeeAlso({ DecryptContentAge.class })
public class EncryptContentAge extends AbstractProcessor implements VerifiableProcessor {
static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Encryption Completed")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("Encryption Failed")
.build();
static final PropertyDescriptor FILE_ENCODING = new PropertyDescriptor.Builder()
.name("File Encoding")
.displayName("File Encoding")
.description("Output encoding for encrypted files. Binary encoding provides optimal processing performance.")
.required(true)
.defaultValue(FileEncoding.BINARY.getValue())
.allowableValues(FileEncoding.class)
.build();
static final PropertyDescriptor PUBLIC_KEY_SOURCE = new PropertyDescriptor.Builder()
.name("Public Key Source")
.displayName("Public Key Source")
.description("Source of information determines the loading strategy for X25519 Public Key Recipients")
.required(true)
.defaultValue(KeySource.PROPERTIES.getValue())
.allowableValues(KeySource.class)
.build();
static final PropertyDescriptor PUBLIC_KEY_RECIPIENTS = new PropertyDescriptor.Builder()
.name("Public Key Recipients")
.displayName("Public Key Recipients")
.description("One or more X25519 Public Key Recipients, separated with newlines, encoded according to the age specification, starting with age1")
.required(true)
.sensitive(true)
.addValidator(new AgeKeyValidator(AgeKeyIndicator.PUBLIC_KEY))
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.TEXT)
.dependsOn(PUBLIC_KEY_SOURCE, KeySource.PROPERTIES)
.build();
static final PropertyDescriptor PUBLIC_KEY_RECIPIENT_RESOURCES = new PropertyDescriptor.Builder()
.name("Public Key Recipient Resources")
.displayName("Public Key Recipient Resources")
.description("One or more files or URLs containing X25519 Public Key Recipients, separated with newlines, encoded according to the age specification, starting with age1")
.required(true)
.addValidator(new AgeKeyValidator(AgeKeyIndicator.PUBLIC_KEY))
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.URL)
.dependsOn(PUBLIC_KEY_SOURCE, KeySource.RESOURCES)
.build();
private static final Set<Relationship> RELATIONSHIPS = new LinkedHashSet<>(Arrays.asList(SUCCESS, FAILURE));
private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(
FILE_ENCODING,
PUBLIC_KEY_SOURCE,
PUBLIC_KEY_RECIPIENTS,
PUBLIC_KEY_RECIPIENT_RESOURCES
);
private static final Provider CIPHER_PROVIDER = AgeProviderResolver.getCipherProvider();
private static final AgeKeyReader<RecipientStanzaWriter> PUBLIC_KEY_READER = new AgePublicKeyReader();
/** 64 Kilobyte buffer aligns with age-encryption payload chunk sizing */
private static final int BUFFER_CAPACITY = 65535;
private static final String KEY_VERIFICATION_STEP = "Verify Public Key Recipients";
private static final String NOT_FOUND_EXPLANATION = "Public Key Recipients not found";
private volatile List<RecipientStanzaWriter> configuredRecipientStanzaWriters = Collections.emptyList();
/**
* Get Relationships
*
* @return Processor Relationships
*/
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
/**
* Get Supported Property Descriptors
*
* @return Processor Supported Property Descriptors
*/
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
/**
* Verify Public Key Identities
*
* @param context Process Context with configured properties
* @param verificationLogger Logger for writing verification results
* @param attributes Sample FlowFile attributes for property value resolution
* @return Configuration Verification Results
*/
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
final ConfigVerificationResult.Builder verificationBuilder = new ConfigVerificationResult.Builder()
.verificationStepName(KEY_VERIFICATION_STEP);
try {
final List<RecipientStanzaWriter> recipientStanzaWriters = getRecipientStanzaWriters(context);
if (recipientStanzaWriters.isEmpty()) {
verificationLogger.warn(NOT_FOUND_EXPLANATION);
verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(NOT_FOUND_EXPLANATION);
} else {
final String explanation = String.format("Public Key Recipients found: %d", recipientStanzaWriters.size());
verificationLogger.info(explanation);
verificationBuilder.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(explanation);
}
} catch (final Exception e) {
final String explanation = String.format("Public Key Recipients not found: %s", e.getMessage());
verificationLogger.warn("Public Key Recipients not found", e);
verificationBuilder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(explanation);
}
results.add(verificationBuilder.build());
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
configuredRecipientStanzaWriters = getRecipientStanzaWriters(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
final String fileEncodingValue = context.getProperty(FILE_ENCODING).getValue();
final FileEncoding fileEncoding = FileEncoding.valueOf(fileEncodingValue);
final EncryptingChannelFactory encryptingChannelFactory = getEncryptingChannelFactory(fileEncoding);
final StreamCallback streamCallback = new EncryptingStreamCallback(configuredRecipientStanzaWriters, encryptingChannelFactory);
flowFile = session.write(flowFile, streamCallback);
session.transfer(flowFile, SUCCESS);
} catch (final Exception e) {
getLogger().error("Encryption Failed {}", flowFile, e);
session.transfer(flowFile, FAILURE);
}
}
private EncryptingChannelFactory getEncryptingChannelFactory(final FileEncoding fileEncoding) {
final EncryptingChannelFactory encryptingChannelFactory;
if (FileEncoding.ASCII == fileEncoding) {
encryptingChannelFactory = new ArmoredEncryptingChannelFactory(CIPHER_PROVIDER);
} else {
encryptingChannelFactory = new StandardEncryptingChannelFactory(CIPHER_PROVIDER);
}
return encryptingChannelFactory;
}
private List<RecipientStanzaWriter> getRecipientStanzaWriters(final PropertyContext context) throws IOException {
final KeySource keySource = KeySource.valueOf(context.getProperty(PUBLIC_KEY_SOURCE).getValue());
final List<ResourceReference> resources = new ArrayList<>();
if (KeySource.PROPERTIES == keySource) {
final ResourceReference resource = context.getProperty(PUBLIC_KEY_RECIPIENTS).asResource();
resources.add(resource);
} else {
final ResourceReferences resourceReferences = context.getProperty(PUBLIC_KEY_RECIPIENT_RESOURCES).asResources();
resources.addAll(resourceReferences.asList());
}
final List<RecipientStanzaWriter> recipientStanzaWriters = new ArrayList<>();
for (final ResourceReference resource : resources) {
try (final InputStream inputStream = resource.read()) {
final List<RecipientStanzaWriter> writers = PUBLIC_KEY_READER.read(inputStream);
recipientStanzaWriters.addAll(writers);
}
}
if (recipientStanzaWriters.isEmpty()) {
throw new IOException(NOT_FOUND_EXPLANATION);
}
return recipientStanzaWriters;
}
private static class EncryptingStreamCallback extends ChannelStreamCallback {
private final List<RecipientStanzaWriter> recipientStanzaWriters;
private final EncryptingChannelFactory encryptingChannelFactory;
private EncryptingStreamCallback(
final List<RecipientStanzaWriter> recipientStanzaWriters,
final EncryptingChannelFactory encryptingChannelFactory
) {
super(BUFFER_CAPACITY);
this.recipientStanzaWriters = recipientStanzaWriters;
this.encryptingChannelFactory = encryptingChannelFactory;
}
@Override
protected WritableByteChannel getWritableChannel(final OutputStream outputStream) throws IOException {
try {
final WritableByteChannel outputChannel = super.getWritableChannel(outputStream);
return encryptingChannelFactory.newEncryptingChannel(outputChannel, recipientStanzaWriters);
} catch (final GeneralSecurityException e) {
throw new IOException("Channel initialization failed", e);
}
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
/**
* Shared key reader abstraction for age-encryption.org public and private keys using configurable prefix and pattern
*
* @param <T> Type of parsed key
*/
abstract class AbstractAgeKeyReader<T> implements AgeKeyReader<T> {
private final AgeKeyIndicator ageKeyIndicator;
/**
* Key Reader with prefix for initial line filtering and pattern for subsequent matching
*
* @param ageKeyIndicator Key Indicator
*/
AbstractAgeKeyReader(final AgeKeyIndicator ageKeyIndicator) {
this.ageKeyIndicator = ageKeyIndicator;
}
/**
* Read keys from Input Stream based on lines starting with indicated prefix and matched against configured pattern
*
* @param inputStream Input Stream to be parsed
* @return Parsed key objects
* @throws IOException Thrown on failures reading Input Stream
*/
@Override
public List<T> read(final InputStream inputStream) throws IOException {
Objects.requireNonNull(inputStream, "Input Stream required");
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
final Set<String> keys = reader.lines()
.filter(line -> line.startsWith(ageKeyIndicator.getPrefix()))
.map(ageKeyIndicator.getPattern()::matcher)
.filter(Matcher::matches)
.map(Matcher::group)
.collect(Collectors.toSet());
return readKeys(keys);
}
}
/**
* Read encoded keys filtered from Input Stream based on matched pattern
*
* @param keys Encoded keys
* @return Parsed key objects
* @throws IOException Thrown on failures reading encoded keys
*/
protected abstract List<T> readKeys(Set<String> keys) throws IOException;
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import java.util.regex.Pattern;
/**
* Pattern indicators for age-encryption.org public and private keys
*/
public enum AgeKeyIndicator {
/** Human-Readable Part with Separator and enumeration of allowed Bech32 uppercase characters from BIP 0173 */
PRIVATE_KEY("AGE-SECRET-KEY-1", Pattern.compile("^AGE-SECRET-KEY-1[QPZRY9X8GF2TVDW0S3JN54KHCE6MUA7L]{58}$")),
/** Human-Readable Part with Separator and enumeration of allowed Bech32 lowercase characters from BIP 0173 */
PUBLIC_KEY("age1", Pattern.compile("^age1[qpzry9x8gf2tvdw0s3jn54khce6mua7l]{58}$"));
private final String prefix;
private final Pattern pattern;
AgeKeyIndicator(final String prefix, final Pattern pattern) {
this.prefix = prefix;
this.pattern = pattern;
}
public String getPrefix() {
return prefix;
}
public Pattern getPattern() {
return pattern;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
* Abstraction for reading public or private keys encoded using Bech32 supporting age-encryption.org
*
* @param <T> Key Type
*/
public interface AgeKeyReader<T> {
/**
* Read Input Stream and return collection of parsed objects corresponding to parsed keys
*
* @param inputStream Input Stream to be parsed
* @return Collection of parsed objects
* @throws IOException Thrown on failure to read Input Stream
*/
List<T> read(InputStream inputStream) throws IOException;
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.resource.StandardResourceReferences;
import org.apache.nifi.components.resource.Utf8TextResource;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
/**
* Component Property Validator for age-encryption X25519 keys encoded using Bech32
*/
public class AgeKeyValidator implements Validator {
private static final String FILE_RESOURCE_EXPLANATION = "File resource validation passed";
private static final String RESOURCE_EXCEPTION = "Read failed: %s";
private static final String NOT_FOUND_EXPLANATION = "Encoded keys not found";
private static final String INVALID_EXPLANATION = "Invalid keys found [%d]";
private static final String VALID_EXPLANATION = "Valid keys found";
private static final String EXPLANATION_SEPARATOR = " and ";
private final AgeKeyIndicator ageKeyIndicator;
/**
* Key Validator with prefix for initial line filtering and pattern for subsequent matching
*
* @param ageKeyIndicator Key Indicator
*/
public AgeKeyValidator(final AgeKeyIndicator ageKeyIndicator) {
this.ageKeyIndicator = ageKeyIndicator;
}
/**
* Validate property reads one or more lines from a newline-delimited string and requires at least one valid key
*
* @param subject Property to be validated
* @param input Property Value to be validated
* @param context Validation Context for property resolution
* @return Validation Result based on finding at least one valid key
*/
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final PropertyValue propertyValue = getPropertyValue(subject, context);
ResourceReferences resources = propertyValue.asResources();
if (resources == null) {
final ResourceReference resourceReference = new Utf8TextResource(input);
resources = new StandardResourceReferences(Collections.singletonList(resourceReference));
}
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject);
final Set<ValidationResult> results = resources.asList().stream().map(this::validateResource).collect(Collectors.toSet());
final String invalidExplanation = results.stream()
.filter(Predicate.not(ValidationResult::isValid))
.map(ValidationResult::getExplanation)
.collect(Collectors.joining(EXPLANATION_SEPARATOR));
if (invalidExplanation.isEmpty()) {
builder.explanation(VALID_EXPLANATION).valid(true);
} else {
builder.explanation(invalidExplanation).valid(false);
}
return builder.build();
}
private PropertyValue getPropertyValue(final String subject, final ValidationContext context) {
final Optional<PropertyDescriptor> propertyFound = context.getProperties()
.keySet()
.stream()
.filter(s -> s.getName().contentEquals(subject))
.findFirst();
final String message = String.format("Property [%s] not found", subject);
final PropertyDescriptor propertyDescriptor = propertyFound.orElseThrow(() -> new IllegalArgumentException(message));
return context.getProperty(propertyDescriptor);
}
private ValidationResult validateResource(final ResourceReference resource) {
final ValidationResult.Builder builder = new ValidationResult.Builder();
final ResourceType resourceType = resource.getResourceType();
if (ResourceType.TEXT == resourceType) {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resource.read()))) {
final Set<String> prefixedLines = reader.lines()
.filter(line -> line.startsWith(ageKeyIndicator.getPrefix()))
.collect(Collectors.toSet());
final long invalid = prefixedLines.stream()
.map(ageKeyIndicator.getPattern()::matcher)
.filter(Predicate.not(Matcher::matches))
.count();
if (prefixedLines.size() == 0) {
builder.explanation(NOT_FOUND_EXPLANATION).valid(false);
} else if (invalid == 0) {
builder.explanation(VALID_EXPLANATION).valid(true);
} else {
final String explanation = String.format(INVALID_EXPLANATION, invalid);
builder.explanation(explanation).valid(false);
}
} catch (final Exception e) {
final String explanation = String.format(RESOURCE_EXCEPTION, e.getMessage());
builder.explanation(explanation).valid(false);
}
} else {
builder.explanation(FILE_RESOURCE_EXPLANATION).valid(true);
}
return builder.build();
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import com.exceptionfactory.jagged.RecipientStanzaReader;
import com.exceptionfactory.jagged.x25519.X25519RecipientStanzaReaderFactory;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.Provider;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* X25519 Private Key implementation age Key Reader
*/
public class AgePrivateKeyReader extends AbstractAgeKeyReader<RecipientStanzaReader> {
private static final Provider KEY_PROVIDER = AgeProviderResolver.getKeyProvider().orElse(null);
public AgePrivateKeyReader() {
super(AgeKeyIndicator.PRIVATE_KEY);
}
/**
* Read private keys and return Recipient Stanza Writers
*
* @param keys Set of private keys
* @return Recipient Stanza Writers
* @throws IOException Thrown on failures parsing private keys
*/
@Override
protected List<RecipientStanzaReader> readKeys(final Set<String> keys) throws IOException {
final List<RecipientStanzaReader> recipientStanzaReaders = new ArrayList<>();
for (final String encodedPrivateKey : keys) {
try {
final RecipientStanzaReader recipientStanzaReader = getRecipientStanzaReader(encodedPrivateKey);
recipientStanzaReaders.add(recipientStanzaReader);
} catch (final Exception e) {
throw new IOException("Parsing Private Key Identities failed", e);
}
}
return recipientStanzaReaders;
}
private RecipientStanzaReader getRecipientStanzaReader(final String encodedPrivateKey) throws GeneralSecurityException {
final RecipientStanzaReader recipientStanzaReader;
if (KEY_PROVIDER == null) {
recipientStanzaReader = X25519RecipientStanzaReaderFactory.newRecipientStanzaReader(encodedPrivateKey);
} else {
recipientStanzaReader = X25519RecipientStanzaReaderFactory.newRecipientStanzaReader(encodedPrivateKey, KEY_PROVIDER);
}
return recipientStanzaReader;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import java.security.Provider;
import java.security.Security;
import java.util.Optional;
/**
* Resolver abstraction for age-encryption Security Algorithm Providers
*/
public final class AgeProviderResolver {
private static final String KEY_ALGORITHM_FILTER = "KeyFactory.X25519";
private static final String CIPHER_ALGORITHM_FILTER = "Cipher.ChaCha20-Poly1305";
private AgeProviderResolver() {
}
/**
* Get Java Security Provider supporting ChaCha20-Poly1305
*
* @return Provider supporting ChaCha20-Poly1305 Cipher that defaults to Bouncy Castle when no other Providers found
*/
public static Provider getCipherProvider() {
final Provider cipherProvider;
final Provider[] providers = Security.getProviders(CIPHER_ALGORITHM_FILTER);
if (providers == null) {
cipherProvider = new BouncyCastleProvider();
} else {
cipherProvider = providers[0];
}
return cipherProvider;
}
/**
* Get available Java Security Provider supporting X25519 and ChaCha20-Poly1305 when registered Provider not found
*
* @return Empty Provider indicating platform support for X25519 Key Factory or Bouncy Castle when no other Providers found
*/
public static Optional<Provider> getKeyProvider() {
final Provider keyProvider;
final Provider[] providers = Security.getProviders(KEY_ALGORITHM_FILTER);
if (providers == null) {
keyProvider = new BouncyCastleProvider();
} else {
keyProvider = null;
}
return Optional.ofNullable(keyProvider);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import com.exceptionfactory.jagged.RecipientStanzaWriter;
import com.exceptionfactory.jagged.x25519.X25519RecipientStanzaWriterFactory;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.Provider;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* X25519 Public Key implementation age Key Reader
*/
public class AgePublicKeyReader extends AbstractAgeKeyReader<RecipientStanzaWriter> {
private static final Provider KEY_PROVIDER = AgeProviderResolver.getKeyProvider().orElse(null);
public AgePublicKeyReader() {
super(AgeKeyIndicator.PUBLIC_KEY);
}
/**
* Read public keys and return Recipient Stanza Writers
*
* @param keys Set of public keys
* @return Recipient Stanza Writers
* @throws IOException Thrown on failure to parse public keys
*/
@Override
protected List<RecipientStanzaWriter> readKeys(final Set<String> keys) throws IOException {
final List<RecipientStanzaWriter> recipientStanzaWriters = new ArrayList<>();
for (final String encodedPublicKey : keys) {
try {
final RecipientStanzaWriter recipientStanzaWriter = getRecipientStanzaWriter(encodedPublicKey);
recipientStanzaWriters.add(recipientStanzaWriter);
} catch (final Exception e) {
throw new IOException("Parsing Public Key Recipients failed", e);
}
}
return recipientStanzaWriters;
}
private RecipientStanzaWriter getRecipientStanzaWriter(final String encodedPublicKey) throws GeneralSecurityException {
final RecipientStanzaWriter recipientStanzaWriter;
if (KEY_PROVIDER == null) {
recipientStanzaWriter = X25519RecipientStanzaWriterFactory.newRecipientStanzaWriter(encodedPublicKey);
} else {
recipientStanzaWriter = X25519RecipientStanzaWriterFactory.newRecipientStanzaWriter(encodedPublicKey, KEY_PROVIDER);
}
return recipientStanzaWriter;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import org.apache.nifi.components.DescribedValue;
/**
* File Encoding options supporting Binary or ASCII Armor
*/
public enum FileEncoding implements DescribedValue {
BINARY("Binary encoding"),
ASCII("ASCII Armor encoding");
private final String description;
FileEncoding(final String description) {
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import org.apache.nifi.components.DescribedValue;
/**
* Key resource loading strategy
*/
public enum KeySource implements DescribedValue {
PROPERTIES("Load one or more keys from configured properties"),
RESOURCES("Load one or more keys from files or URLs");
private final String description;
KeySource(final String description) {
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.io;
import org.apache.nifi.processor.io.StreamCallback;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
/**
* Channel Stream Callback wraps Java IO Streams in Java NIO Channels with methods for customization of channels
*/
public class ChannelStreamCallback implements StreamCallback {
private static final int END_OF_FILE = -1;
private final int bufferCapacity;
/**
* Channel Stream Callback with configurable buffer capacity for transferring bytes
*
* @param bufferCapacity Buffer capacity in bytes for transferring between channels
*/
public ChannelStreamCallback(final int bufferCapacity) {
this.bufferCapacity = bufferCapacity;
}
/**
* Process streams using NIO Channels with configured Buffer
*
* @param inputStream Input Stream to be wrapped using a Readable Byte Channel
* @param outputStream Output Stream to be wrapped using a Writable Byte Channel
* @throws IOException Thrown on read or write failures
*/
@Override
public void process(final InputStream inputStream, final OutputStream outputStream) throws IOException {
try (
final ReadableByteChannel readableByteChannel = getReadableChannel(inputStream);
final WritableByteChannel outputChannel = getWritableChannel(outputStream)
) {
final ByteBuffer buffer = ByteBuffer.allocate(bufferCapacity);
while (readableByteChannel.read(buffer) != END_OF_FILE) {
buffer.flip();
while (buffer.hasRemaining()) {
outputChannel.write(buffer);
}
buffer.clear();
}
buffer.flip();
if (buffer.hasRemaining()) {
while (buffer.hasRemaining()) {
outputChannel.write(buffer);
}
}
}
}
/**
* Get Readable Byte Channel defaults to using Channels.newChannel()
*
* @param inputStream Input Stream to be wrapped
* @return Readable Byte Channel wrapping provided Input Stream
* @throws IOException Throw on channel initialization failures
*/
protected ReadableByteChannel getReadableChannel(final InputStream inputStream) throws IOException {
return Channels.newChannel(inputStream);
}
/**
* Get Writable Byte Channel defaults to using Channels.newChannel()
*
* @param outputStream Output Stream to be wrapped
* @return Writable Byte Channel wrapping provided Output Stream
* @throws IOException Thrown on channel initialization failures
*/
protected WritableByteChannel getWritableChannel(final OutputStream outputStream) throws IOException {
return Channels.newChannel(outputStream);
}
}

View File

@ -14,4 +14,6 @@
# limitations under the License.
org.apache.nifi.processors.cipher.DecryptContentCompatibility
org.apache.nifi.processors.cipher.DecryptContent
org.apache.nifi.processors.cipher.DecryptContentAge
org.apache.nifi.processors.cipher.EncryptContentAge
org.apache.nifi.processors.cipher.VerifyContentMAC

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher;
import com.exceptionfactory.jagged.EncryptingChannelFactory;
import com.exceptionfactory.jagged.RecipientStanzaWriter;
import com.exceptionfactory.jagged.framework.armor.ArmoredEncryptingChannelFactory;
import com.exceptionfactory.jagged.framework.stream.StandardEncryptingChannelFactory;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.cipher.age.AgeProviderResolver;
import org.apache.nifi.processors.cipher.age.AgePublicKeyReader;
import org.apache.nifi.processors.cipher.age.FileEncoding;
import org.apache.nifi.processors.cipher.age.KeySource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.opentest4j.AssertionFailedError;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.Provider;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class DecryptContentAgeTest {
private static final byte[] WORD = {'W', 'O', 'R', 'D'};
private static final String CONTENT_WORD = new String(WORD);
private static final String PRIVATE_KEY_CHECKSUM_INVALID = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFFF";
private static final String PRIVATE_KEY_ENCODED = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFTN";
private static final String PRIVATE_KEY_SECOND = "AGE-SECRET-KEY-1AU0T8M9GWJ4PEQK9TGS54T6VHRL8DLFTZ7AWYJDFTLMDZZWZQKDSA8K882";
private static final String PUBLIC_KEY_ENCODED = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn5gg";
private static final String KEY_FILE_SUFFIX = ".key";
private TestRunner runner;
@BeforeEach
void setRunner() {
runner = TestRunners.newTestRunner(DecryptContentAge.class);
}
@Test
void testRequiredProperties() {
runner.assertNotValid();
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED);
runner.assertValid();
}
@Test
void testPrivateKeyNotValid() {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PUBLIC_KEY_ENCODED);
runner.assertNotValid();
}
@Test
void testVerifySuccessful() {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.SUCCESSFUL);
}
@Test
void testVerifyFailedChecksumInvalid() {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_CHECKSUM_INVALID);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED);
}
@Test
void testVerifyFailedResourcesNotConfigured() {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.RESOURCES.getValue());
assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED);
}
@Test
void testRunSuccessAscii() throws GeneralSecurityException, IOException {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertSuccess(FileEncoding.ASCII);
}
@Test
void testRunSuccessBinary() throws GeneralSecurityException, IOException {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertSuccess(FileEncoding.BINARY);
}
@Test
void testRunSuccessBinaryMultiplePrivateKeys() throws GeneralSecurityException, IOException {
final String multiplePrivateKeys = String.format("%s%n%s", PRIVATE_KEY_ENCODED, PRIVATE_KEY_SECOND);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, multiplePrivateKeys);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertSuccess(FileEncoding.BINARY);
}
@Test
void testRunSuccessBinaryKeySourceResources(@TempDir final Path tempDir) throws GeneralSecurityException, IOException {
final Path tempFile = createTempFile(tempDir);
Files.writeString(tempFile, PRIVATE_KEY_ENCODED);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITY_RESOURCES, tempFile.toString());
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.RESOURCES.getValue());
assertSuccess(FileEncoding.BINARY);
}
@Test
void testRunScheduledErrorPrivateKeyNotFound(@TempDir final Path tempDir) throws GeneralSecurityException, IOException {
final Path tempFile = createTempFile(tempDir);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITY_RESOURCES, tempFile.toString());
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.RESOURCES.getValue());
final byte[] encrypted = getEncrypted(FileEncoding.BINARY);
runner.enqueue(encrypted);
assertThrows(AssertionFailedError.class, runner::run);
}
@Test
void testRunFailure() {
runner.setProperty(DecryptContentAge.PRIVATE_KEY_IDENTITIES, PRIVATE_KEY_ENCODED);
runner.setProperty(DecryptContentAge.PRIVATE_KEY_SOURCE, KeySource.PROPERTIES.getValue());
runner.enqueue(WORD);
runner.run();
runner.assertAllFlowFilesTransferred(DecryptContentAge.FAILURE);
}
private void assertSuccess(final FileEncoding fileEncoding) throws GeneralSecurityException, IOException {
final byte[] encrypted = getEncrypted(fileEncoding);
runner.enqueue(encrypted);
runner.run();
runner.assertAllFlowFilesTransferred(DecryptContentAge.SUCCESS);
final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DecryptContentAge.SUCCESS).iterator();
assertTrue(flowFiles.hasNext());
final MockFlowFile flowFile = flowFiles.next();
final String content = flowFile.getContent();
assertEquals(CONTENT_WORD, content);
}
private void assertVerificationResultOutcomeEquals(final ConfigVerificationResult.Outcome expected) {
final VerifiableProcessor processor = (VerifiableProcessor) runner.getProcessor();
final Iterator<ConfigVerificationResult> results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()).iterator();
assertTrue(results.hasNext());
final ConfigVerificationResult result = results.next();
assertEquals(expected, result.getOutcome());
}
private byte[] getEncrypted(final FileEncoding fileEncoding) throws GeneralSecurityException, IOException {
final EncryptingChannelFactory encryptingChannelFactory = getEncryptingChannelFactory(fileEncoding);
final AgePublicKeyReader publicKeyReader = new AgePublicKeyReader();
final ByteArrayInputStream encodedInputStream = new ByteArrayInputStream(PUBLIC_KEY_ENCODED.getBytes(StandardCharsets.UTF_8));
final List<RecipientStanzaWriter> recipientStanzaWriters = publicKeyReader.read(encodedInputStream);
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final WritableByteChannel outputChannel = Channels.newChannel(outputStream);
try (final WritableByteChannel encryptingChannel = encryptingChannelFactory.newEncryptingChannel(outputChannel, recipientStanzaWriters)) {
final ByteBuffer buffer = ByteBuffer.wrap(WORD);
encryptingChannel.write(buffer);
}
return outputStream.toByteArray();
}
private EncryptingChannelFactory getEncryptingChannelFactory(final FileEncoding fileEncoding) {
final Provider provider = AgeProviderResolver.getCipherProvider();
final EncryptingChannelFactory encryptingChannelFactory;
if (FileEncoding.ASCII == fileEncoding) {
encryptingChannelFactory = new ArmoredEncryptingChannelFactory(provider);
} else {
encryptingChannelFactory = new StandardEncryptingChannelFactory(provider);
}
return encryptingChannelFactory;
}
private Path createTempFile(final Path tempDir) throws IOException {
return Files.createTempFile(tempDir, DecryptContentAgeTest.class.getSimpleName(), KEY_FILE_SUFFIX);
}
}

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher;
import com.exceptionfactory.jagged.DecryptingChannelFactory;
import com.exceptionfactory.jagged.RecipientStanzaReader;
import com.exceptionfactory.jagged.framework.armor.ArmoredDecryptingChannelFactory;
import com.exceptionfactory.jagged.framework.stream.StandardDecryptingChannelFactory;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.cipher.age.AgePrivateKeyReader;
import org.apache.nifi.processors.cipher.age.AgeProviderResolver;
import org.apache.nifi.processors.cipher.age.FileEncoding;
import org.apache.nifi.processors.cipher.age.KeySource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.opentest4j.AssertionFailedError;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.Provider;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class EncryptContentAgeTest {
private static final String BINARY_VERSION = "age-encryption.org";
private static final String ASCII_HEADER = "-----BEGIN AGE ENCRYPTED FILE-----";
private static final byte[] WORD = {'W', 'O', 'R', 'D'};
private static final int BUFFER_CAPACITY = 20;
private static final String PRIVATE_KEY_ENCODED = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFTN";
private static final String PUBLIC_KEY_ENCODED = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn5gg";
private static final String PUBLIC_KEY_CHECKSUM_INVALID = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn000";
private static final String KEY_FILE_SUFFIX = ".key";
private TestRunner runner;
@BeforeEach
void setRunner() {
runner = TestRunners.newTestRunner(EncryptContentAge.class);
}
@Test
void testRequiredProperties() {
runner.assertNotValid();
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED);
runner.assertValid();
}
@Test
void testPublicKeyNotValid() {
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PRIVATE_KEY_ENCODED);
runner.assertNotValid();
}
@Test
void testVerifySuccessful() {
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED);
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.SUCCESSFUL);
}
@Test
void testVerifyFailedChecksumInvalid() {
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_CHECKSUM_INVALID);
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue());
assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED);
}
@Test
void testVerifyFailedResourcesNotFound() {
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.RESOURCES.getValue());
assertVerificationResultOutcomeEquals(ConfigVerificationResult.Outcome.FAILED);
}
@Test
void testRunSuccessAscii() throws GeneralSecurityException, IOException {
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED);
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue());
runner.setProperty(EncryptContentAge.FILE_ENCODING, FileEncoding.ASCII.getValue());
runner.enqueue(WORD);
runner.run();
runner.assertAllFlowFilesTransferred(EncryptContentAge.SUCCESS);
assertAsciiFound();
}
@Test
void testRunSuccessBinary() throws GeneralSecurityException, IOException {
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENTS, PUBLIC_KEY_ENCODED);
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.PROPERTIES.getValue());
runner.setProperty(EncryptContentAge.FILE_ENCODING, FileEncoding.BINARY.getValue());
runner.enqueue(WORD);
runner.run();
runner.assertAllFlowFilesTransferred(EncryptContentAge.SUCCESS);
assertBinaryFound();
}
@Test
void testRunSuccessBinaryKeySourceResources(@TempDir final Path tempDir) throws GeneralSecurityException, IOException {
final Path tempFile = createTempFile(tempDir);
Files.writeString(tempFile, PUBLIC_KEY_ENCODED);
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENT_RESOURCES, tempFile.toString());
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.RESOURCES.getValue());
runner.setProperty(EncryptContentAge.FILE_ENCODING, FileEncoding.BINARY.getValue());
runner.enqueue(WORD);
runner.run();
runner.assertAllFlowFilesTransferred(EncryptContentAge.SUCCESS);
assertBinaryFound();
}
@Test
void testRunScheduledErrorPublicKeyNotFound(@TempDir final Path tempDir) throws IOException {
final Path tempFile = createTempFile(tempDir);
runner.setProperty(EncryptContentAge.PUBLIC_KEY_RECIPIENT_RESOURCES, tempFile.toString());
runner.setProperty(EncryptContentAge.PUBLIC_KEY_SOURCE, KeySource.RESOURCES.getValue());
runner.enqueue(WORD);
assertThrows(AssertionFailedError.class, runner::run);
}
private void assertBinaryFound() throws GeneralSecurityException, IOException {
final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(EncryptContentAge.SUCCESS).iterator();
assertTrue(flowFiles.hasNext());
final MockFlowFile flowFile = flowFiles.next();
final String content = flowFile.getContent();
assertTrue(content.startsWith(BINARY_VERSION));
final byte[] decrypted = getDecrypted(FileEncoding.BINARY, flowFile.getContentStream());
assertArrayEquals(WORD, decrypted);
}
private void assertAsciiFound() throws GeneralSecurityException, IOException {
final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(EncryptContentAge.SUCCESS).iterator();
assertTrue(flowFiles.hasNext());
final MockFlowFile flowFile = flowFiles.next();
final String content = flowFile.getContent();
assertTrue(content.startsWith(ASCII_HEADER));
final byte[] decrypted = getDecrypted(FileEncoding.ASCII, flowFile.getContentStream());
assertArrayEquals(WORD, decrypted);
}
private void assertVerificationResultOutcomeEquals(final ConfigVerificationResult.Outcome expected) {
final VerifiableProcessor processor = (VerifiableProcessor) runner.getProcessor();
final Iterator<ConfigVerificationResult> results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()).iterator();
assertTrue(results.hasNext());
final ConfigVerificationResult result = results.next();
assertEquals(expected, result.getOutcome());
}
private byte[] getDecrypted(final FileEncoding fileEncoding, final InputStream inputStream) throws GeneralSecurityException, IOException {
final DecryptingChannelFactory decryptingChannelFactory = getDecryptingChannelFactory(fileEncoding);
final AgePrivateKeyReader privateKeyReader = new AgePrivateKeyReader();
final ByteArrayInputStream encodedInputStream = new ByteArrayInputStream(PRIVATE_KEY_ENCODED.getBytes(StandardCharsets.UTF_8));
final List<RecipientStanzaReader> recipientStanzaReaders = privateKeyReader.read(encodedInputStream);
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final WritableByteChannel outputChannel = Channels.newChannel(outputStream);
try (final ReadableByteChannel decryptingChannel = decryptingChannelFactory.newDecryptingChannel(Channels.newChannel(inputStream), recipientStanzaReaders)) {
final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_CAPACITY);
decryptingChannel.read(buffer);
buffer.flip();
outputChannel.write(buffer);
}
return outputStream.toByteArray();
}
private DecryptingChannelFactory getDecryptingChannelFactory(final FileEncoding fileEncoding) {
final Provider provider = AgeProviderResolver.getCipherProvider();
final DecryptingChannelFactory decryptingChannelFactory;
if (FileEncoding.ASCII == fileEncoding) {
decryptingChannelFactory = new ArmoredDecryptingChannelFactory(provider);
} else {
decryptingChannelFactory = new StandardDecryptingChannelFactory(provider);
}
return decryptingChannelFactory;
}
private Path createTempFile(final Path tempDir) throws IOException {
return Files.createTempFile(tempDir, EncryptContentAgeTest.class.getSimpleName(), KEY_FILE_SUFFIX);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import com.exceptionfactory.jagged.RecipientStanzaReader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class AgePrivateKeyReaderTest {
private static final String PRIVATE_KEY_ENCODED = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFTN";
private static final String PRIVATE_KEY_CHECKSUM_INVALID = "AGE-SECRET-KEY-1NZKRS8L39Y2KVLXT7XX4DHFVDUUWN0699NJYR2EJS8RWGRYN279Q8GSFFF";
private AgePrivateKeyReader reader;
@BeforeEach
void setReader() {
reader = new AgePrivateKeyReader();
}
@Test
void testRead() throws IOException {
final InputStream inputStream = new ByteArrayInputStream(PRIVATE_KEY_ENCODED.getBytes(StandardCharsets.UTF_8));
final Iterator<RecipientStanzaReader> recipientStanzaReaders = reader.read(inputStream).iterator();
assertTrue(recipientStanzaReaders.hasNext());
final RecipientStanzaReader recipientStanzaReader = recipientStanzaReaders.next();
assertNotNull(recipientStanzaReader);
assertFalse(recipientStanzaReaders.hasNext());
}
@Test
void testReadChecksumInvalid() {
final InputStream inputStream = new ByteArrayInputStream(PRIVATE_KEY_CHECKSUM_INVALID.getBytes(StandardCharsets.UTF_8));
assertThrows(IOException.class, () -> reader.read(inputStream));
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.age;
import com.exceptionfactory.jagged.RecipientStanzaWriter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class AgePublicKeyReaderTest {
private static final String PUBLIC_KEY_ENCODED = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn5gg";
private static final String PUBLIC_KEY_CHECKSUM_INVALID = "age1qqnete0p2wzcc7y55trm3c4jzr608g4xdvyfw9jurugyt6maauussgn000";
private AgePublicKeyReader reader;
@BeforeEach
void setReader() {
reader = new AgePublicKeyReader();
}
@Test
void testRead() throws IOException {
final InputStream inputStream = new ByteArrayInputStream(PUBLIC_KEY_ENCODED.getBytes(StandardCharsets.UTF_8));
final Iterator<RecipientStanzaWriter> recipientStanzaWriters = reader.read(inputStream).iterator();
assertTrue(recipientStanzaWriters.hasNext());
final RecipientStanzaWriter recipientStanzaWriter = recipientStanzaWriters.next();
assertNotNull(recipientStanzaWriter);
assertFalse(recipientStanzaWriters.hasNext());
}
@Test
void testReadChecksumInvalid() {
final InputStream inputStream = new ByteArrayInputStream(PUBLIC_KEY_CHECKSUM_INVALID.getBytes(StandardCharsets.UTF_8));
assertThrows(IOException.class, () -> reader.read(inputStream));
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cipher.io;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
class ChannelStreamCallbackTest {
private static final int BUFFER_CAPACITY = 8192;
private static final int BUFFER_CAPACITY_DOUBLED = 16384;
private static final byte BYTE = 7;
private static final int END_OF_FILE = -1;
@Test
void testProcessEmpty() throws IOException {
final byte[] expected = new byte[]{};
assertProcessSuccess(expected);
}
@Test
void testProcessSingleByte() throws IOException {
final byte[] expected = new byte[]{BYTE};
assertProcessSuccess(expected);
}
@Test
void testProcessBufferCapacity() throws IOException {
final byte[] expected = new byte[BUFFER_CAPACITY];
Arrays.fill(expected, BYTE);
assertProcessSuccess(expected);
}
@Test
void testProcessBufferCapacityDoubled() throws IOException {
final byte[] expected = new byte[BUFFER_CAPACITY_DOUBLED];
Arrays.fill(expected, BYTE);
assertProcessSuccess(expected);
}
@Test
void testProcessReadBufferRemaining() throws IOException {
final ChannelStreamCallback callback = new BufferRemainingChannelStreamCallback();
final byte[] expected = new byte[]{BYTE};
final ByteArrayInputStream inputStream = new ByteArrayInputStream(expected);
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
callback.process(inputStream, outputStream);
assertArrayEquals(expected, outputStream.toByteArray());
}
private void assertProcessSuccess(final byte[] expected) throws IOException {
final ChannelStreamCallback callback = new ChannelStreamCallback(BUFFER_CAPACITY);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(expected);
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
callback.process(inputStream, outputStream);
assertArrayEquals(expected, outputStream.toByteArray());
}
private static class BufferRemainingChannelStreamCallback extends ChannelStreamCallback {
public BufferRemainingChannelStreamCallback() {
super(BUFFER_CAPACITY);
}
protected ReadableByteChannel getReadableChannel(final InputStream inputStream) {
return new BufferRemainingReadableByteChannel();
}
}
private static class BufferRemainingReadableByteChannel implements ReadableByteChannel {
@Override
public int read(final ByteBuffer buffer) {
buffer.put(BYTE);
return END_OF_FILE;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() {
}
}
}

View File

@ -29,4 +29,16 @@
<module>nifi-cipher-processors</module>
<module>nifi-cipher-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.exceptionfactory.jagged</groupId>
<artifactId>jagged-bom</artifactId>
<version>0.2.0</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
</project>