NIFI-30: Merged code into develop

This commit is contained in:
Mark Payne 2015-05-01 10:28:26 -04:00
commit 7db27c01b2
11 changed files with 1169 additions and 368 deletions

View File

@ -46,7 +46,10 @@ public enum EncryptionMethod {
SHA_256AES("PBEWITHSHAAND256BITAES-CBC-BC", "BC", true),
SHA_3KEYTRIPLEDES("PBEWITHSHAAND3-KEYTRIPLEDES-CBC", "BC", true),
SHA_TWOFISH("PBEWITHSHAANDTWOFISH-CBC", "BC", true),
SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true);
SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true),
PGP("PGP", "BC", false),
PGP_ASCII_ARMOR("PGP-ASCII-ARMOR", "BC", false);
private final String algorithm;
private final String provider;
private final boolean unlimitedStrength;

View File

@ -80,6 +80,10 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk16</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpg-jdk16</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
@ -187,6 +191,8 @@
<exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude>
<exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude>
<exclude>src/test/resources/ScanAttribute/dictionary1</exclude>
<exclude>src/test/resources/TestEncryptContent/text.txt</exclude>
<exclude>src/test/resources/TestEncryptContent/text.txt.asc</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude>
<exclude>src/test/resources/TestJson/json-sample.json</exclude>
<exclude>src/test/resources/TestMergeContent/demarcate</exclude>

View File

@ -1,282 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.EncryptionMethod;
import org.apache.nifi.util.StopWatch;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.PBEParameterSpec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.SecureRandom;
import java.security.Security;
import java.text.Normalizer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"encryption", "decryption", "password", "JCE"})
@CapabilityDescription("Encrypts or Decrypts a FlowFile using a randomly generated salt")
public class EncryptContent extends AbstractProcessor {
public static final String ENCRYPT_MODE = "Encrypt";
public static final String DECRYPT_MODE = "Decrypt";
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
public static final int DEFAULT_SALT_SIZE = 8;
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Specifies whether the content should be encrypted or decrypted")
.required(true)
.allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
.defaultValue(ENCRYPT_MODE)
.build();
public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder()
.name("Encryption Algorithm")
.description("The Encryption Algorithm to use")
.required(true)
.allowableValues(EncryptionMethod.values())
.defaultValue(EncryptionMethod.MD5_256AES.name())
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The Password to use for encrypting or decrypting the data")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully encrypted or decrypted will be routed to success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
static {
Security.addProvider(new BouncyCastleProvider());
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(MODE);
properties.add(ENCRYPTION_ALGORITHM);
properties.add(PASSWORD);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue();
final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method);
final String providerName = encryptionMethod.getProvider();
final String algorithm = encryptionMethod.getAlgorithm();
final String password = context.getProperty(PASSWORD).getValue();
final char[] normalizedPassword = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray();
final PBEKeySpec pbeKeySpec = new PBEKeySpec(normalizedPassword);
final SecureRandom secureRandom;
final SecretKeyFactory factory;
final SecretKey secretKey;
final Cipher cipher;
try {
secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
secureRandom.setSeed(System.currentTimeMillis());
factory = SecretKeyFactory.getInstance(algorithm, providerName);
secretKey = factory.generateSecret(pbeKeySpec);
cipher = Cipher.getInstance(algorithm, providerName);
} catch (final Exception e) {
logger.error("failed to initialize Encryption/Decryption algorithm due to {}", new Object[]{e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final int algorithmBlockSize = cipher.getBlockSize();
final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE;
final StopWatch stopWatch = new StopWatch(true);
final String mode = context.getProperty(MODE).getValue();
try {
if (mode.equalsIgnoreCase(ENCRYPT_MODE)) {
final byte[] salt = new byte[saltSize];
secureRandom.nextBytes(salt);
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
try {
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
} catch (final InvalidKeyException | InvalidAlgorithmParameterException e) {
logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.write(flowFile, new EncryptCallback(cipher, salt));
logger.info("Successfully encrypted {}", new Object[]{flowFile});
} else {
if (flowFile.getSize() <= saltSize) {
logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize));
logger.info("successfully decrypted {}", new Object[]{flowFile});
}
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final ProcessException pe) {
getLogger().error("Failed to {} {} due to {}; routing to failure", new Object[] {mode, flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}
private static class DecryptCallback implements StreamCallback {
private final Cipher cipher;
private final SecretKey secretKey;
private final int saltSize;
public DecryptCallback(final Cipher cipher, final SecretKey secretKey, final int saltSize) {
this.cipher = cipher;
this.secretKey = secretKey;
this.saltSize = saltSize;
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
final byte[] salt = new byte[saltSize];
StreamUtils.fillBuffer(in, salt);
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
try {
cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec);
} catch (final Exception e) {
throw new ProcessException(e);
}
final byte[] buffer = new byte[65536];
int len;
while ((len = in.read(buffer)) > 0) {
final byte[] decryptedBytes = cipher.update(buffer, 0, len);
if (decryptedBytes != null) {
out.write(decryptedBytes);
}
}
try {
out.write(cipher.doFinal());
} catch (final Exception e) {
throw new ProcessException(e);
}
}
}
private static class EncryptCallback implements StreamCallback {
private final Cipher cipher;
private final byte[] salt;
public EncryptCallback(final Cipher cipher, final byte[] salt) {
this.cipher = cipher;
this.salt = salt;
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
out.write(salt);
final byte[] buffer = new byte[65536];
int len;
while ((len = in.read(buffer)) > 0) {
final byte[] encryptedBytes = cipher.update(buffer, 0, len);
if (encryptedBytes != null) {
out.write(encryptedBytes);
}
}
try {
out.write(cipher.doFinal());
} catch (final IllegalBlockSizeException | BadPaddingException e) {
throw new ProcessException(e);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.security.Security;
import java.text.Normalizer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.OpenPGPKeyBasedEncryptor;
import org.apache.nifi.processors.standard.util.OpenPGPPasswordBasedEncryptor;
import org.apache.nifi.processors.standard.util.PasswordBasedEncryptor;
import org.apache.nifi.security.util.EncryptionMethod;
import org.apache.nifi.util.StopWatch;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"encryption", "decryption", "password", "JCE", "OpenPGP", "PGP", "GPG"})
@CapabilityDescription("Encrypts or Decrypts a FlowFile using either symmetric encryption with a password and randomly generated salt, or asymmetric encryption using a public and secret key.")
public class EncryptContent extends AbstractProcessor {
public static final String ENCRYPT_MODE = "Encrypt";
public static final String DECRYPT_MODE = "Decrypt";
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Specifies whether the content should be encrypted or decrypted")
.required(true)
.allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
.defaultValue(ENCRYPT_MODE)
.build();
public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder()
.name("Encryption Algorithm")
.description("The Encryption Algorithm to use")
.required(true)
.allowableValues(EncryptionMethod.values())
.defaultValue(EncryptionMethod.MD5_256AES.name())
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The Password to use for encrypting or decrypting the data")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor PUBLIC_KEYRING = new PropertyDescriptor.Builder()
.name("public-keyring-file")
.displayName("Public Keyring File")
.description("In a PGP encrypt mode, this keyring contains the public key of the recipient")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PUBLIC_KEY_USERID = new PropertyDescriptor.Builder()
.name("public-key-user-id")
.displayName("Public Key User Id")
.description("In a PGP encrypt mode, this user id of the recipient")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PRIVATE_KEYRING = new PropertyDescriptor.Builder()
.name("private-keyring-file")
.displayName("Private Keyring File")
.description("In a PGP decrypt mode, this keyring contains the private key of the recipient")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PRIVATE_KEYRING_PASSPHRASE = new PropertyDescriptor.Builder()
.name("private-keyring-passphrase")
.displayName("Private Keyring Passphrase")
.description("In a PGP decrypt mode, this is the private keyring passphrase")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
static {
// add BouncyCastle encryption providers
Security.addProvider(new BouncyCastleProvider());
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(MODE);
properties.add(ENCRYPTION_ALGORITHM);
properties.add(PASSWORD);
properties.add(PUBLIC_KEYRING);
properties.add(PUBLIC_KEY_USERID);
properties.add(PRIVATE_KEYRING);
properties.add(PRIVATE_KEYRING_PASSPHRASE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
public static boolean isPGPAlgorithm(final String algorithm) {
return algorithm.startsWith("PGP");
}
public static boolean isPGPArmoredAlgorithm(final String algorithm) {
return isPGPAlgorithm(algorithm) && algorithm.endsWith("ASCII-ARMOR");
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue();
final String algorithm = EncryptionMethod.valueOf(method).getAlgorithm();
final String password = context.getProperty(PASSWORD).getValue();
if (isPGPAlgorithm(algorithm)) {
if (password == null) {
final boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE);
if (encrypt) {
// need both public-keyring-file and public-key-user-id set
final String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue();
final String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue();
if (publicKeyring == null || publicUserId == null) {
validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName())
.explanation(algorithm + " encryption without a " + PASSWORD.getDisplayName() + " requires both "
+ PUBLIC_KEYRING.getDisplayName() + " and " + PUBLIC_KEY_USERID.getDisplayName())
.build());
} else {
// verify the public keyring contains the user id
try {
if (OpenPGPKeyBasedEncryptor.getPublicKey(publicUserId, publicKeyring) == null) {
validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName())
.explanation(PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring
+ " does not contain user id " + publicUserId)
.build());
}
} catch (final Exception e) {
validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName())
.explanation("Invalid " + PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring
+ " because " + e.toString())
.build());
}
}
} else {
// need both private-keyring-file and private-keyring-passphrase set
final String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue();
final String keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue();
if (privateKeyring == null || keyringPassphrase == null) {
validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getName())
.explanation(algorithm + " decryption without a " + PASSWORD.getDisplayName() + " requires both "
+ PRIVATE_KEYRING.getDisplayName() + " and " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName())
.build());
} else {
final String providerName = EncryptionMethod.valueOf(method).getProvider();
// verify the passphrase works on the private keyring
try {
if (!OpenPGPKeyBasedEncryptor.validateKeyring(providerName, privateKeyring, keyringPassphrase.toCharArray())) {
validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName())
.explanation(PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring
+ " could not be opened with the provided " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName())
.build());
}
} catch (final Exception e) {
validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName())
.explanation("Invalid " + PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring
+ " because " + e.toString())
.build());
}
}
}
}
} else if (password == null) {
validationResults.add(new ValidationResult.Builder().subject(PASSWORD.getName())
.explanation(PASSWORD.getDisplayName() + " is required when using algorithm " + algorithm).build());
}
return validationResults;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue();
final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method);
final String providerName = encryptionMethod.getProvider();
final String algorithm = encryptionMethod.getAlgorithm();
final String password = context.getProperty(PASSWORD).getValue();
final boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE);
Encryptor encryptor;
StreamCallback callback;
try {
if (isPGPAlgorithm(algorithm)) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue();
final String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue();
if (encrypt && publicKeyring != null) {
final String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue();
encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, publicKeyring, publicUserId, null, filename);
} else if (!encrypt && privateKeyring != null) {
final char[] keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue().toCharArray();
encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, privateKeyring, null, keyringPassphrase,
filename);
} else {
final char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray();
encryptor = new OpenPGPPasswordBasedEncryptor(algorithm, providerName, passphrase, filename);
}
} else {
final char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray();
encryptor = new PasswordBasedEncryptor(algorithm, providerName, passphrase);
}
if (encrypt) {
callback = encryptor.getEncryptionCallback();
} else {
callback = encryptor.getDecryptionCallback();
}
} catch (final Exception e) {
logger.error("Failed to initialize {}cryption algorithm because - ", new Object[] { encrypt ? "en" : "de", e });
session.rollback();
context.yield();
return;
}
try {
final StopWatch stopWatch = new StopWatch(true);
flowFile = session.write(flowFile, callback);
logger.info("successfully {}crypted {}", new Object[] { encrypt ? "en" : "de", flowFile });
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final ProcessException e) {
logger.error("Cannot {}crypt {} - ", new Object[] { encrypt ? "en" : "de", flowFile, e });
session.transfer(flowFile, REL_FAILURE);
return;
}
}
public static interface Encryptor {
public StreamCallback getEncryptionCallback() throws Exception;
public StreamCallback getDecryptionCallback() throws Exception;
}
}

View File

@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchProviderException;
import java.security.SecureRandom;
import java.util.Date;
import java.util.Iterator;
import java.util.zip.Deflater;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.EncryptContent;
import org.apache.nifi.processors.standard.EncryptContent.Encryptor;
import org.bouncycastle.bcpg.ArmoredOutputStream;
import org.bouncycastle.openpgp.PGPCompressedData;
import org.bouncycastle.openpgp.PGPCompressedDataGenerator;
import org.bouncycastle.openpgp.PGPEncryptedData;
import org.bouncycastle.openpgp.PGPEncryptedDataGenerator;
import org.bouncycastle.openpgp.PGPEncryptedDataList;
import org.bouncycastle.openpgp.PGPException;
import org.bouncycastle.openpgp.PGPLiteralData;
import org.bouncycastle.openpgp.PGPLiteralDataGenerator;
import org.bouncycastle.openpgp.PGPObjectFactory;
import org.bouncycastle.openpgp.PGPPrivateKey;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.bouncycastle.openpgp.PGPPublicKeyEncryptedData;
import org.bouncycastle.openpgp.PGPPublicKeyRing;
import org.bouncycastle.openpgp.PGPPublicKeyRingCollection;
import org.bouncycastle.openpgp.PGPSecretKey;
import org.bouncycastle.openpgp.PGPSecretKeyRing;
import org.bouncycastle.openpgp.PGPSecretKeyRingCollection;
import org.bouncycastle.openpgp.PGPUtil;
public class OpenPGPKeyBasedEncryptor implements Encryptor {
private String algorithm;
private String provider;
private String keyring;
private String userId;
private char[] passphrase;
private String filename;
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
public OpenPGPKeyBasedEncryptor(final String algorithm, final String provider, final String keyring, final String userId,
final char[] passphrase, final String filename) {
this.algorithm = algorithm;
this.provider = provider;
this.keyring = keyring;
this.userId = userId;
this.passphrase = passphrase;
this.filename = filename;
}
@Override
public StreamCallback getEncryptionCallback() throws Exception {
return new OpenPGPEncryptCallback(algorithm, provider, keyring, userId, filename);
}
@Override
public StreamCallback getDecryptionCallback() throws Exception {
return new OpenPGPDecryptCallback(provider, keyring, passphrase);
}
/*
* Validate secret keyring passphrase
*/
public static boolean validateKeyring(String provider, String secretKeyringFile, char[] passphrase) throws IOException,
PGPException, NoSuchProviderException {
PGPSecretKeyRingCollection pgpsec = new PGPSecretKeyRingCollection(PGPUtil.getDecoderStream(Files.newInputStream(Paths
.get(secretKeyringFile))));
Iterator ringit = pgpsec.getKeyRings();
while (ringit.hasNext()) {
PGPSecretKeyRing secretkeyring = (PGPSecretKeyRing) ringit.next();
PGPSecretKey secretkey = secretkeyring.getSecretKey();
secretkey.extractPrivateKey(passphrase, provider);
return true;
}
return false;
}
/*
* Get the public key for a specific user id from a keyring.
*/
@SuppressWarnings("rawtypes")
public static PGPPublicKey getPublicKey(String userId, String publicKeyring) throws IOException, PGPException {
PGPPublicKey pubkey = null;
PGPPublicKeyRingCollection pgppub = new
PGPPublicKeyRingCollection(PGPUtil.getDecoderStream(Files.newInputStream(Paths.get(publicKeyring))));
Iterator ringit = pgppub.getKeyRings();
while (ringit.hasNext()) {
PGPPublicKeyRing kring = (PGPPublicKeyRing) ringit.next();
Iterator keyit = kring.getPublicKeys();
while (keyit.hasNext()) {
pubkey = (PGPPublicKey) keyit.next();
boolean userIdMatch = false;
Iterator userit = pubkey.getUserIDs();
while (userit.hasNext()) {
String id = userit.next().toString();
if (id.contains(userId)) {
userIdMatch = true;
break;
}
}
if (pubkey.isEncryptionKey() && userIdMatch) {
return pubkey;
}
}
}
return null;
}
private class OpenPGPDecryptCallback implements StreamCallback {
private String provider;
private String secretKeyring;
private char[] passphrase;
OpenPGPDecryptCallback(final String provider, final String keyring, final char[] passphrase) {
this.provider = provider;
this.secretKeyring = keyring;
this.passphrase = passphrase;
}
@Override
public void process(InputStream in, OutputStream out) throws IOException {
InputStream pgpin = PGPUtil.getDecoderStream(in);
PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin);
Object obj = pgpFactory.nextObject();
if (!(obj instanceof PGPEncryptedDataList)) {
obj = pgpFactory.nextObject();
if (!(obj instanceof PGPEncryptedDataList)) {
throw new ProcessException("Invalid OpenPGP data");
}
}
PGPEncryptedDataList encList = (PGPEncryptedDataList) obj;
PGPSecretKeyRingCollection pgpSecretKeyring;
try {
// open secret keyring file
pgpSecretKeyring = new PGPSecretKeyRingCollection(PGPUtil.getDecoderStream(Files
.newInputStream(Paths.get(secretKeyring))));
} catch (Exception e) {
throw new ProcessException("Invalid secret keyring - " + e.getMessage());
}
try {
PGPPrivateKey privateKey = null;
PGPPublicKeyEncryptedData encData = null;
// find the secret key in the encrypted data
Iterator it = encList.getEncryptedDataObjects();
while (privateKey == null && it.hasNext()) {
obj = it.next();
if (!(obj instanceof PGPPublicKeyEncryptedData)) {
throw new ProcessException("Invalid OpenPGP data");
}
encData = (PGPPublicKeyEncryptedData) obj;
PGPSecretKey secretkey = pgpSecretKeyring.getSecretKey(encData.getKeyID());
if (secretkey != null) {
privateKey = secretkey.extractPrivateKey(passphrase, provider);
}
}
if (privateKey == null) {
throw new ProcessException("Secret keyring does not contain the key required to decrypt");
}
InputStream clearData = encData.getDataStream(privateKey, provider);
PGPObjectFactory clearFactory = new PGPObjectFactory(clearData);
obj = clearFactory.nextObject();
if (obj instanceof PGPCompressedData) {
PGPCompressedData compData = (PGPCompressedData) obj;
clearFactory = new PGPObjectFactory(compData.getDataStream());
obj = clearFactory.nextObject();
}
PGPLiteralData literal = (PGPLiteralData) obj;
InputStream lis = literal.getInputStream();
final byte[] buffer = new byte[4096];
int len;
while ((len = lis.read(buffer)) >= 0) {
out.write(buffer, 0, len);
}
} catch (Exception e) {
throw new ProcessException(e.getMessage());
}
}
}
private class OpenPGPEncryptCallback implements StreamCallback {
private String algorithm;
private String provider;
private String publicKeyring;
private String userId;
private String filename;
OpenPGPEncryptCallback(final String algorithm, final String provider, final String keyring, final String userId,
final String filename) {
this.algorithm = algorithm;
this.provider = provider;
this.publicKeyring = keyring;
this.userId = userId;
this.filename = filename;
}
@Override
public void process(InputStream in, OutputStream out) throws IOException {
PGPPublicKey publicKey;
try {
publicKey = getPublicKey(userId, publicKeyring);
} catch (Exception e) {
throw new ProcessException("Invalid public keyring - " + e.getMessage());
}
try {
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
OutputStream output = out;
if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) {
output = new ArmoredOutputStream(out);
}
PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false,
secureRandom, provider);
encGenerator.addMethod(publicKey);
OutputStream encOut = encGenerator.open(output, new byte[65536]);
PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED);
OutputStream compOut = compData.open(encOut, new byte[65536]);
PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator();
OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]);
final byte[] buffer = new byte[4096];
int len;
while ((len = in.read(buffer)) >= 0) {
literalOut.write(buffer, 0, len);
}
literalOut.close();
compOut.close();
encOut.close();
output.close();
} catch (Exception e) {
throw new ProcessException(e.getMessage());
}
}
}
}

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.SecureRandom;
import java.util.Date;
import java.util.zip.Deflater;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.EncryptContent;
import org.apache.nifi.processors.standard.EncryptContent.Encryptor;
import org.bouncycastle.bcpg.ArmoredOutputStream;
import org.bouncycastle.openpgp.PGPCompressedData;
import org.bouncycastle.openpgp.PGPCompressedDataGenerator;
import org.bouncycastle.openpgp.PGPEncryptedData;
import org.bouncycastle.openpgp.PGPEncryptedDataGenerator;
import org.bouncycastle.openpgp.PGPEncryptedDataList;
import org.bouncycastle.openpgp.PGPLiteralData;
import org.bouncycastle.openpgp.PGPLiteralDataGenerator;
import org.bouncycastle.openpgp.PGPObjectFactory;
import org.bouncycastle.openpgp.PGPPBEEncryptedData;
import org.bouncycastle.openpgp.PGPUtil;
public class OpenPGPPasswordBasedEncryptor implements Encryptor {
private String algorithm;
private String provider;
private char[] password;
private String filename;
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
public OpenPGPPasswordBasedEncryptor(final String algorithm, final String provider, final char[] passphrase, final String filename) {
this.algorithm = algorithm;
this.provider = provider;
this.password = passphrase;
this.filename = filename;
}
@Override
public StreamCallback getEncryptionCallback() throws Exception {
return new OpenPGPEncryptCallback(algorithm, provider, password, filename);
}
@Override
public StreamCallback getDecryptionCallback() throws Exception {
return new OpenPGPDecryptCallback(provider, password);
}
private class OpenPGPDecryptCallback implements StreamCallback {
private String provider;
private char[] password;
OpenPGPDecryptCallback(final String provider, final char[] password) {
this.provider = provider;
this.password = password;
}
@Override
public void process(InputStream in, OutputStream out) throws IOException {
InputStream pgpin = PGPUtil.getDecoderStream(in);
PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin);
Object obj = pgpFactory.nextObject();
if (!(obj instanceof PGPEncryptedDataList)) {
obj = pgpFactory.nextObject();
if (!(obj instanceof PGPEncryptedDataList)) {
throw new ProcessException("Invalid OpenPGP data");
}
}
PGPEncryptedDataList encList = (PGPEncryptedDataList) obj;
obj = encList.get(0);
if (!(obj instanceof PGPPBEEncryptedData)) {
throw new ProcessException("Invalid OpenPGP data");
}
PGPPBEEncryptedData encData = (PGPPBEEncryptedData) obj;
try {
InputStream clearData = encData.getDataStream(password, provider);
PGPObjectFactory clearFactory = new PGPObjectFactory(clearData);
obj = clearFactory.nextObject();
if (obj instanceof PGPCompressedData) {
PGPCompressedData compData = (PGPCompressedData) obj;
clearFactory = new PGPObjectFactory(compData.getDataStream());
obj = clearFactory.nextObject();
}
PGPLiteralData literal = (PGPLiteralData) obj;
InputStream lis = literal.getInputStream();
final byte[] buffer = new byte[4096];
int len;
while ((len = lis.read(buffer)) >= 0) {
out.write(buffer, 0, len);
}
} catch (Exception e) {
throw new ProcessException(e.getMessage());
}
}
}
private class OpenPGPEncryptCallback implements StreamCallback {
private String algorithm;
private String provider;
private char[] password;
private String filename;
OpenPGPEncryptCallback(final String algorithm, final String provider, final char[] password, final String filename) {
this.algorithm = algorithm;
this.provider = provider;
this.password = password;
this.filename = filename;
}
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try {
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
OutputStream output = out;
if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) {
output = new ArmoredOutputStream(out);
}
PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false,
secureRandom, provider);
encGenerator.addMethod(password);
OutputStream encOut = encGenerator.open(output, new byte[65536]);
PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED);
OutputStream compOut = compData.open(encOut, new byte[65536]);
PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator();
OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]);
final byte[] buffer = new byte[4096];
int len;
while ((len = in.read(buffer)) >= 0) {
literalOut.write(buffer, 0, len);
}
literalOut.close();
compOut.close();
encOut.close();
output.close();
} catch (Exception e) {
throw new ProcessException(e.getMessage());
}
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.SecureRandom;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.PBEParameterSpec;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.EncryptContent.Encryptor;
import org.apache.nifi.stream.io.StreamUtils;
public class PasswordBasedEncryptor implements Encryptor {
private Cipher cipher;
private int saltSize;
private SecretKey secretKey;
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
public static final int DEFAULT_SALT_SIZE = 8;
public PasswordBasedEncryptor(final String algorithm, final String providerName, final char[] password) {
super();
try {
// initialize cipher
this.cipher = Cipher.getInstance(algorithm, providerName);
int algorithmBlockSize = cipher.getBlockSize();
this.saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE;
// initialize SecretKey from password
PBEKeySpec pbeKeySpec = new PBEKeySpec(password);
SecretKeyFactory factory = SecretKeyFactory.getInstance(algorithm, providerName);
this.secretKey = factory.generateSecret(pbeKeySpec);
} catch (Exception e) {
throw new ProcessException(e);
}
}
@Override
public StreamCallback getEncryptionCallback() throws ProcessException {
try {
byte[] salt = new byte[saltSize];
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
secureRandom.setSeed(System.currentTimeMillis());
secureRandom.nextBytes(salt);
return new EncryptCallback(salt);
} catch (Exception e) {
throw new ProcessException(e);
}
}
@Override
public StreamCallback getDecryptionCallback() throws ProcessException {
return new DecryptCallback();
}
private class DecryptCallback implements StreamCallback {
public DecryptCallback() {
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
final byte[] salt = new byte[saltSize];
try {
StreamUtils.fillBuffer(in, salt);
} catch (final EOFException e) {
throw new ProcessException("Cannot decrypt because file size is smaller than salt size", e);
}
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
try {
cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec);
} catch (final Exception e) {
throw new ProcessException(e);
}
final byte[] buffer = new byte[65536];
int len;
while ((len = in.read(buffer)) > 0) {
final byte[] decryptedBytes = cipher.update(buffer, 0, len);
if (decryptedBytes != null) {
out.write(decryptedBytes);
}
}
try {
out.write(cipher.doFinal());
} catch (final Exception e) {
throw new ProcessException(e);
}
}
}
private class EncryptCallback implements StreamCallback {
private final byte[] salt;
public EncryptCallback(final byte[] salt) {
this.salt = salt;
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
try {
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
} catch (final Exception e) {
throw new ProcessException(e);
}
out.write(salt);
final byte[] buffer = new byte[65536];
int len;
while ((len = in.read(buffer)) > 0) {
final byte[] encryptedBytes = cipher.update(buffer, 0, len);
if (encryptedBytes != null) {
out.write(encryptedBytes);
}
}
try {
out.write(cipher.doFinal());
} catch (final IllegalBlockSizeException | BadPaddingException e) {
throw new ProcessException(e);
}
}
}
}

View File

@ -0,0 +1,30 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>EvaluateJsonPath</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
</head>
<body>
<!-- Processor Documentation ================================================== -->
<p>
<strong>Note:</strong> This processor supports OpenPGP algorithms that are compatible with third party programs.
However, it currently cannot add a digital signature to an encrypted FlowFile.
</p>
</body>
</html>

View File

@ -1,85 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import org.apache.nifi.security.util.EncryptionMethod;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestEncryptContent {
@Test
public void testRoundTrip() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent());
testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
for (final EncryptionMethod method : EncryptionMethod.values()) {
if (method.isUnlimitedStrength()) {
continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default.
}
testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name());
testRunner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.clearTransferState();
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
testRunner.assertQueueEmpty();
testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
testRunner.enqueue(flowFile);
testRunner.clearTransferState();
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
}
}
@Test
public void testDecryptNonEncryptedFile() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent());
testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
for (final EncryptionMethod method : EncryptionMethod.values()) {
if (method.isUnlimitedStrength()) {
continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default.
}
testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name());
testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.clearTransferState();
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.security.util.EncryptionMethod;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
public class TestEncryptContent {
@Test
public void testRoundTrip() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent());
testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
for (final EncryptionMethod method : EncryptionMethod.values()) {
if (method.isUnlimitedStrength()) {
continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default.
}
testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name());
testRunner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.clearTransferState();
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
testRunner.assertQueueEmpty();
testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
testRunner.enqueue(flowFile);
testRunner.clearTransferState();
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
}
}
@Test
public void testDecryptSmallerThanSaltSize() {
final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class);
runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
runner.enqueue(new byte[4]);
runner.run();
runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
}
@Test
public void testPGPDecrypt() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class);
runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP_ASCII_ARMOR.name());
runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc"));
runner.run();
runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt"));
}
@Test
public void testValidation() {
final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class);
Collection<ValidationResult> results;
MockProcessContext pc;
results = new HashSet<>();
runner.enqueue(new byte[0]);
pc = (MockProcessContext) runner.getProcessContext();
results = pc.validate();
Assert.assertEquals(1, results.size());
for (final ValidationResult vr : results) {
Assert.assertTrue(vr.toString()
.contains(EncryptContent.PASSWORD.getDisplayName() + " is required when using algorithm"));
}
results = new HashSet<>();
runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name());
runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/text.txt");
runner.enqueue(new byte[0]);
pc = (MockProcessContext) runner.getProcessContext();
results = pc.validate();
Assert.assertEquals(1, results.size());
for (final ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains(
" encryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both "
+ EncryptContent.PUBLIC_KEYRING.getDisplayName() + " and "
+ EncryptContent.PUBLIC_KEY_USERID.getDisplayName()));
}
results = new HashSet<>();
runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID");
runner.enqueue(new byte[0]);
pc = (MockProcessContext) runner.getProcessContext();
results = pc.validate();
Assert.assertEquals(1, results.size());
for (final ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("does not contain user id USERID"));
}
runner.removeProperty(EncryptContent.PUBLIC_KEYRING);
runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID);
results = new HashSet<>();
runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/text.txt");
runner.enqueue(new byte[0]);
pc = (MockProcessContext) runner.getProcessContext();
results = pc.validate();
Assert.assertEquals(1, results.size());
for (final ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains(
" decryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both "
+ EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and "
+ EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
}
results = new HashSet<>();
runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, "PASSWORD");
runner.enqueue(new byte[0]);
pc = (MockProcessContext) runner.getProcessContext();
results = pc.validate();
Assert.assertEquals(1, results.size());
for (final ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains(
" could not be opened with the provided " + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
}
}
}

View File

@ -0,0 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
This is some clear text.

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-----BEGIN PGP MESSAGE-----
Version: GnuPG v2
jA0EAwMC54x6GeEEmLy3ycE/pY5T6vawFMS9/Io8chEQGD8UNAbZFzqXcVEvvy5f
wJQggyuF6Lo1RZGnhxdGoz0do1DZzR0lVaTL8dR0/jtz/kRZ1omz+OxICuo9BaRX
M+fT2mdna5lhDGJmE7nCctDaGwXqjglEXPqOdi8j/tL225HViTKP1VmlKuu8AjiH
lMGuC65bqILSWCaE2jewCbsmjHPgLGmH9NN6EAo2kiFEMOrA+UYo35PuShkgQsZp
gA0m2A31JjLW28SUsNd1vLk5bWBZaIFA1UvhR7u0pagv3qtu5f8qls19nHnAT/bz
Kh2KrnIm0peWWVEPGQkFoK3Lt9vJTjmHdHPUXQHyg+SMN1PIGA2sxwiSrkQlAyon
uNg/I24ctydWU+qndz+ycDWR6zBziA09KHw7uKo5CDtTm+Zo3K9U9uf9y8iZ+AKd
vgF/4Nw2lJTqQfNtkmK+N+cKEyZNmJa4r+uDzJF/dCv8R5jGj2dBYRTLxj5tgllU
4GiwuJR6w09hK0S0oe9XTdcNciWigb12H9z6U6JOse+1S/fYoUa0CZRJg9Bgeqym
uuT/mNSKwRVcWN27vOGy+zGf0tqw6A5idLrK+8FZzd1sgxtKsgkYz8FcZgo9rq1f
PHR9KF4JhaGpqNJmEu/GucYIAgq3aeo9GoV/RpDZtoHAVBuqPwcDTzGHeiAoZ49U
6Q==
=H5nf
-----END PGP MESSAGE-----

View File

@ -145,6 +145,11 @@
<artifactId>bcprov-jdk16</artifactId>
<version>1.46</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpg-jdk16</artifactId>
<version>1.46</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>