mirror of https://github.com/apache/nifi.git
NIFI-30 NIFI-374 EncryptContent support of PGP
This commit is contained in:
parent
53b6ac3333
commit
156d9409ff
|
@ -46,7 +46,10 @@ public enum EncryptionMethod {
|
|||
SHA_256AES("PBEWITHSHAAND256BITAES-CBC-BC", "BC", true),
|
||||
SHA_3KEYTRIPLEDES("PBEWITHSHAAND3-KEYTRIPLEDES-CBC", "BC", true),
|
||||
SHA_TWOFISH("PBEWITHSHAANDTWOFISH-CBC", "BC", true),
|
||||
SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true);
|
||||
SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true),
|
||||
PGP("PGP", "BC", false),
|
||||
PGP_ASCII_ARMOR("PGP-ASCII-ARMOR", "BC", false);
|
||||
|
||||
private final String algorithm;
|
||||
private final String provider;
|
||||
private final boolean unlimitedStrength;
|
||||
|
|
|
@ -80,6 +80,10 @@
|
|||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcprov-jdk16</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcpg-jdk16</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
|
@ -186,6 +190,8 @@
|
|||
<exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude>
|
||||
<exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude>
|
||||
<exclude>src/test/resources/ScanAttribute/dictionary1</exclude>
|
||||
<exclude>src/test/resources/TestEncryptContent/text.txt</exclude>
|
||||
<exclude>src/test/resources/TestEncryptContent/text.txt.asc</exclude>
|
||||
<exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude>
|
||||
<exclude>src/test/resources/TestJson/json-sample.json</exclude>
|
||||
<exclude>src/test/resources/TestMergeContent/demarcate</exclude>
|
||||
|
|
|
@ -16,54 +16,51 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import java.security.Security;
|
||||
import java.text.Normalizer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.OpenPGPKeyBasedEncryptor;
|
||||
import org.apache.nifi.processors.standard.util.OpenPGPPasswordBasedEncryptor;
|
||||
import org.apache.nifi.processors.standard.util.PasswordBasedEncryptor;
|
||||
import org.apache.nifi.security.util.EncryptionMethod;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
|
||||
import javax.crypto.*;
|
||||
import javax.crypto.spec.PBEKeySpec;
|
||||
import javax.crypto.spec.PBEParameterSpec;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.InvalidAlgorithmParameterException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.Security;
|
||||
import java.text.Normalizer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@Tags({"encryption", "decryption", "password", "JCE"})
|
||||
@CapabilityDescription("Encrypts or Decrypts a FlowFile using a randomly generated salt")
|
||||
@Tags({"encryption", "decryption", "password", "JCE", "OpenPGP", "PGP", "GPG"})
|
||||
@CapabilityDescription("Encrypts or Decrypts a FlowFile using either symmetric encryption with a password and randomly generated salt, or asymmetric encryption using a public and secret key.")
|
||||
public class EncryptContent extends AbstractProcessor {
|
||||
|
||||
public static final String ENCRYPT_MODE = "Encrypt";
|
||||
public static final String DECRYPT_MODE = "Decrypt";
|
||||
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
|
||||
public static final int DEFAULT_SALT_SIZE = 8;
|
||||
|
||||
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
|
||||
.name("Mode")
|
||||
|
@ -82,13 +79,44 @@ public class EncryptContent extends AbstractProcessor {
|
|||
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Password")
|
||||
.description("The Password to use for encrypting or decrypting the data")
|
||||
.required(true)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor PUBLIC_KEYRING = new PropertyDescriptor.Builder()
|
||||
.name("public-keyring-file")
|
||||
.displayName("Public Keyring File")
|
||||
.description("In a PGP encrypt mode, this keyring contains the public key of the recipient")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor PUBLIC_KEY_USERID = new PropertyDescriptor.Builder()
|
||||
.name("public-key-user-id")
|
||||
.displayName("Public Key User Id")
|
||||
.description("In a PGP encrypt mode, this user id of the recipient")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor PRIVATE_KEYRING = new PropertyDescriptor.Builder()
|
||||
.name("private-keyring-file")
|
||||
.displayName("Private Keyring File")
|
||||
.description("In a PGP decrypt mode, this keyring contains the private key of the recipient")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor PRIVATE_KEYRING_PASSPHRASE = new PropertyDescriptor.Builder()
|
||||
.name("private-keyring-passphrase")
|
||||
.displayName("Private Keyring Passphrase")
|
||||
.description("In a PGP decrypt mode, this is the private keyring passphrase")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build();
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
.description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build();
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
private Set<Relationship> relationships;
|
||||
|
@ -104,6 +132,10 @@ public class EncryptContent extends AbstractProcessor {
|
|||
properties.add(MODE);
|
||||
properties.add(ENCRYPTION_ALGORITHM);
|
||||
properties.add(PASSWORD);
|
||||
properties.add(PUBLIC_KEYRING);
|
||||
properties.add(PUBLIC_KEY_USERID);
|
||||
properties.add(PRIVATE_KEYRING);
|
||||
properties.add(PRIVATE_KEYRING_PASSPHRASE);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
|
@ -122,6 +154,85 @@ public class EncryptContent extends AbstractProcessor {
|
|||
return properties;
|
||||
}
|
||||
|
||||
public static boolean isPGPAlgorithm(String algorithm) {
|
||||
return algorithm.startsWith("PGP");
|
||||
}
|
||||
|
||||
public static boolean isPGPArmoredAlgorithm(String algorithm) {
|
||||
return isPGPAlgorithm(algorithm) && algorithm.endsWith("ASCII-ARMOR");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
|
||||
final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue();
|
||||
final String algorithm = EncryptionMethod.valueOf(method).getAlgorithm();
|
||||
final String password = context.getProperty(PASSWORD).getValue();
|
||||
if (isPGPAlgorithm(algorithm)) {
|
||||
if (password == null) {
|
||||
boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE);
|
||||
if (encrypt) {
|
||||
// need both public-keyring-file and public-key-user-id set
|
||||
String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue();
|
||||
String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue();
|
||||
if (publicKeyring == null || publicUserId == null) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName())
|
||||
.explanation(algorithm + " encryption without a " + PASSWORD.getDisplayName() + " requires both "
|
||||
+ PUBLIC_KEYRING.getDisplayName() + " and " + PUBLIC_KEY_USERID.getDisplayName())
|
||||
.build());
|
||||
} else {
|
||||
// verify the public keyring contains the user id
|
||||
try {
|
||||
if (OpenPGPKeyBasedEncryptor.getPublicKey(publicUserId, publicKeyring) == null) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName())
|
||||
.explanation(PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring
|
||||
+ " does not contain user id " + publicUserId)
|
||||
.build());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName())
|
||||
.explanation("Invalid " + PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring
|
||||
+ " because " + e.toString())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// need both private-keyring-file and private-keyring-passphrase set
|
||||
String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue();
|
||||
String keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue();
|
||||
if (privateKeyring == null || keyringPassphrase == null) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getName())
|
||||
.explanation(algorithm + " decryption without a " + PASSWORD.getDisplayName() + " requires both "
|
||||
+ PRIVATE_KEYRING.getDisplayName() + " and " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName())
|
||||
.build());
|
||||
} else {
|
||||
final String providerName = EncryptionMethod.valueOf(method).getProvider();
|
||||
// verify the passphrase works on the private keyring
|
||||
try {
|
||||
if (!OpenPGPKeyBasedEncryptor.validateKeyring(providerName, privateKeyring, keyringPassphrase.toCharArray())) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName())
|
||||
.explanation(PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring
|
||||
+ " could not be opened with the provided " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName())
|
||||
.build());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName())
|
||||
.explanation("Invalid " + PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring
|
||||
+ " because " + e.toString())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (password == null) {
|
||||
validationResults.add(new ValidationResult.Builder().subject(PASSWORD.getName())
|
||||
.explanation(PASSWORD.getDisplayName() + " is required when using algorithm " + algorithm).build());
|
||||
}
|
||||
}
|
||||
return validationResults;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
|
@ -134,130 +245,62 @@ public class EncryptContent extends AbstractProcessor {
|
|||
final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method);
|
||||
final String providerName = encryptionMethod.getProvider();
|
||||
final String algorithm = encryptionMethod.getAlgorithm();
|
||||
|
||||
final String password = context.getProperty(PASSWORD).getValue();
|
||||
final char[] normalizedPassword = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray();
|
||||
final PBEKeySpec pbeKeySpec = new PBEKeySpec(normalizedPassword);
|
||||
boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE);
|
||||
|
||||
final SecureRandom secureRandom;
|
||||
final SecretKeyFactory factory;
|
||||
final SecretKey secretKey;
|
||||
final Cipher cipher;
|
||||
Encryptor encryptor;
|
||||
StreamCallback callback;
|
||||
try {
|
||||
secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
|
||||
secureRandom.setSeed(System.currentTimeMillis());
|
||||
factory = SecretKeyFactory.getInstance(algorithm, providerName);
|
||||
secretKey = factory.generateSecret(pbeKeySpec);
|
||||
cipher = Cipher.getInstance(algorithm, providerName);
|
||||
} catch (final Exception e) {
|
||||
logger.error("failed to initialize Encryption/Decryption algorithm due to {}", new Object[]{e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final int algorithmBlockSize = cipher.getBlockSize();
|
||||
final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE;
|
||||
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
if (context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) {
|
||||
final byte[] salt = new byte[saltSize];
|
||||
secureRandom.nextBytes(salt);
|
||||
|
||||
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
|
||||
try {
|
||||
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
|
||||
} catch (final InvalidKeyException | InvalidAlgorithmParameterException e) {
|
||||
logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
flowFile = session.write(flowFile, new EncryptCallback(cipher, salt));
|
||||
logger.info("Successfully encrypted {}", new Object[]{flowFile});
|
||||
if (isPGPAlgorithm(algorithm)) {
|
||||
String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue();
|
||||
String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue();
|
||||
if (encrypt && publicKeyring != null) {
|
||||
String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue();
|
||||
encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, publicKeyring, publicUserId, null, filename);
|
||||
} else if (!encrypt && privateKeyring != null) {
|
||||
char[] keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue().toCharArray();
|
||||
encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, privateKeyring, null, keyringPassphrase,
|
||||
filename);
|
||||
} else {
|
||||
if (flowFile.getSize() <= saltSize) {
|
||||
logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray();
|
||||
encryptor = new OpenPGPPasswordBasedEncryptor(algorithm, providerName, passphrase, filename);
|
||||
}
|
||||
} else {
|
||||
char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray();
|
||||
encryptor = new PasswordBasedEncryptor(algorithm, providerName, passphrase);
|
||||
}
|
||||
|
||||
if (encrypt) {
|
||||
callback = encryptor.getEncryptionCallback();
|
||||
} else {
|
||||
callback = encryptor.getDecryptionCallback();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to initialize {}cryption algorithm because - ", new Object[] { encrypt ? "en" : "de", e });
|
||||
session.rollback();
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize));
|
||||
logger.info("successfully decrypted {}", new Object[]{flowFile});
|
||||
}
|
||||
|
||||
try {
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
flowFile = session.write(flowFile, callback);
|
||||
logger.info("successfully {}crypted {}", new Object[] { encrypt ? "en" : "de", flowFile });
|
||||
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
|
||||
private static class DecryptCallback implements StreamCallback {
|
||||
|
||||
private final Cipher cipher;
|
||||
private final SecretKey secretKey;
|
||||
private final int saltSize;
|
||||
|
||||
public DecryptCallback(final Cipher cipher, final SecretKey secretKey, final int saltSize) {
|
||||
this.cipher = cipher;
|
||||
this.secretKey = secretKey;
|
||||
this.saltSize = saltSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||
final byte[] salt = new byte[saltSize];
|
||||
StreamUtils.fillBuffer(in, salt);
|
||||
|
||||
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
|
||||
try {
|
||||
cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
|
||||
final byte[] buffer = new byte[65536];
|
||||
int len;
|
||||
while ((len = in.read(buffer)) > 0) {
|
||||
final byte[] decryptedBytes = cipher.update(buffer, 0, len);
|
||||
if (decryptedBytes != null) {
|
||||
out.write(decryptedBytes);
|
||||
} catch (ProcessException e) {
|
||||
logger.error("Cannot {}crypt {} - ", new Object[] { encrypt ? "en" : "de", flowFile, e });
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
out.write(cipher.doFinal());
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
public static interface Encryptor {
|
||||
public StreamCallback getEncryptionCallback() throws Exception;
|
||||
|
||||
public StreamCallback getDecryptionCallback() throws Exception;
|
||||
}
|
||||
|
||||
private static class EncryptCallback implements StreamCallback {
|
||||
|
||||
private final Cipher cipher;
|
||||
private final byte[] salt;
|
||||
|
||||
public EncryptCallback(final Cipher cipher, final byte[] salt) {
|
||||
this.cipher = cipher;
|
||||
this.salt = salt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||
out.write(salt);
|
||||
|
||||
final byte[] buffer = new byte[65536];
|
||||
int len;
|
||||
while ((len = in.read(buffer)) > 0) {
|
||||
final byte[] encryptedBytes = cipher.update(buffer, 0, len);
|
||||
if (encryptedBytes != null) {
|
||||
out.write(encryptedBytes);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
out.write(cipher.doFinal());
|
||||
} catch (final IllegalBlockSizeException | BadPaddingException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,277 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.NoSuchProviderException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.zip.Deflater;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.apache.nifi.processors.standard.EncryptContent;
|
||||
import org.apache.nifi.processors.standard.EncryptContent.Encryptor;
|
||||
import org.bouncycastle.bcpg.ArmoredOutputStream;
|
||||
import org.bouncycastle.openpgp.PGPCompressedData;
|
||||
import org.bouncycastle.openpgp.PGPCompressedDataGenerator;
|
||||
import org.bouncycastle.openpgp.PGPEncryptedData;
|
||||
import org.bouncycastle.openpgp.PGPEncryptedDataGenerator;
|
||||
import org.bouncycastle.openpgp.PGPEncryptedDataList;
|
||||
import org.bouncycastle.openpgp.PGPException;
|
||||
import org.bouncycastle.openpgp.PGPLiteralData;
|
||||
import org.bouncycastle.openpgp.PGPLiteralDataGenerator;
|
||||
import org.bouncycastle.openpgp.PGPObjectFactory;
|
||||
import org.bouncycastle.openpgp.PGPPrivateKey;
|
||||
import org.bouncycastle.openpgp.PGPPublicKey;
|
||||
import org.bouncycastle.openpgp.PGPPublicKeyEncryptedData;
|
||||
import org.bouncycastle.openpgp.PGPPublicKeyRing;
|
||||
import org.bouncycastle.openpgp.PGPPublicKeyRingCollection;
|
||||
import org.bouncycastle.openpgp.PGPSecretKey;
|
||||
import org.bouncycastle.openpgp.PGPSecretKeyRing;
|
||||
import org.bouncycastle.openpgp.PGPSecretKeyRingCollection;
|
||||
import org.bouncycastle.openpgp.PGPUtil;
|
||||
|
||||
public class OpenPGPKeyBasedEncryptor implements Encryptor {
|
||||
|
||||
private String algorithm;
|
||||
private String provider;
|
||||
private String keyring;
|
||||
private String userId;
|
||||
private char[] passphrase;
|
||||
private String filename;
|
||||
|
||||
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
|
||||
|
||||
public OpenPGPKeyBasedEncryptor(final String algorithm, final String provider, final String keyring, final String userId,
|
||||
final char[] passphrase, final String filename) {
|
||||
this.algorithm = algorithm;
|
||||
this.provider = provider;
|
||||
this.keyring = keyring;
|
||||
this.userId = userId;
|
||||
this.passphrase = passphrase;
|
||||
this.filename = filename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamCallback getEncryptionCallback() throws Exception {
|
||||
return new OpenPGPEncryptCallback(algorithm, provider, keyring, userId, filename);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamCallback getDecryptionCallback() throws Exception {
|
||||
return new OpenPGPDecryptCallback(provider, keyring, passphrase);
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate secret keyring passphrase
|
||||
*/
|
||||
public static boolean validateKeyring(String provider, String secretKeyringFile, char[] passphrase) throws IOException,
|
||||
PGPException, NoSuchProviderException {
|
||||
PGPSecretKeyRingCollection pgpsec = new PGPSecretKeyRingCollection(PGPUtil.getDecoderStream(Files.newInputStream(Paths
|
||||
.get(secretKeyringFile))));
|
||||
Iterator ringit = pgpsec.getKeyRings();
|
||||
while (ringit.hasNext()) {
|
||||
PGPSecretKeyRing secretkeyring = (PGPSecretKeyRing) ringit.next();
|
||||
PGPSecretKey secretkey = secretkeyring.getSecretKey();
|
||||
secretkey.extractPrivateKey(passphrase, provider);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the public key for a specific user id from a keyring.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static PGPPublicKey getPublicKey(String userId, String publicKeyring) throws IOException, PGPException {
|
||||
PGPPublicKey pubkey = null;
|
||||
PGPPublicKeyRingCollection pgppub = new
|
||||
PGPPublicKeyRingCollection(PGPUtil.getDecoderStream(Files.newInputStream(Paths.get(publicKeyring))));
|
||||
|
||||
Iterator ringit = pgppub.getKeyRings();
|
||||
while (ringit.hasNext()) {
|
||||
PGPPublicKeyRing kring = (PGPPublicKeyRing) ringit.next();
|
||||
|
||||
Iterator keyit = kring.getPublicKeys();
|
||||
while (keyit.hasNext()) {
|
||||
pubkey = (PGPPublicKey) keyit.next();
|
||||
boolean userIdMatch = false;
|
||||
|
||||
Iterator userit = pubkey.getUserIDs();
|
||||
while (userit.hasNext()) {
|
||||
String id = userit.next().toString();
|
||||
if (id.contains(userId)) {
|
||||
userIdMatch = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pubkey.isEncryptionKey() && userIdMatch) {
|
||||
return pubkey;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private class OpenPGPDecryptCallback implements StreamCallback {
|
||||
|
||||
private String provider;
|
||||
private String secretKeyring;
|
||||
private char[] passphrase;
|
||||
|
||||
OpenPGPDecryptCallback(final String provider, final String keyring, final char[] passphrase) {
|
||||
this.provider = provider;
|
||||
this.secretKeyring = keyring;
|
||||
this.passphrase = passphrase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
InputStream pgpin = PGPUtil.getDecoderStream(in);
|
||||
PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin);
|
||||
|
||||
Object obj = pgpFactory.nextObject();
|
||||
if (!(obj instanceof PGPEncryptedDataList)) {
|
||||
obj = pgpFactory.nextObject();
|
||||
if (!(obj instanceof PGPEncryptedDataList)) {
|
||||
throw new ProcessException("Invalid OpenPGP data");
|
||||
}
|
||||
}
|
||||
PGPEncryptedDataList encList = (PGPEncryptedDataList) obj;
|
||||
|
||||
PGPSecretKeyRingCollection pgpSecretKeyring;
|
||||
try {
|
||||
// open secret keyring file
|
||||
pgpSecretKeyring = new PGPSecretKeyRingCollection(PGPUtil.getDecoderStream(Files
|
||||
.newInputStream(Paths.get(secretKeyring))));
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException("Invalid secret keyring - " + e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
PGPPrivateKey privateKey = null;
|
||||
PGPPublicKeyEncryptedData encData = null;
|
||||
|
||||
// find the secret key in the encrypted data
|
||||
Iterator it = encList.getEncryptedDataObjects();
|
||||
while (privateKey == null && it.hasNext()) {
|
||||
obj = it.next();
|
||||
if (!(obj instanceof PGPPublicKeyEncryptedData)) {
|
||||
throw new ProcessException("Invalid OpenPGP data");
|
||||
}
|
||||
encData = (PGPPublicKeyEncryptedData) obj;
|
||||
PGPSecretKey secretkey = pgpSecretKeyring.getSecretKey(encData.getKeyID());
|
||||
if (secretkey != null) {
|
||||
privateKey = secretkey.extractPrivateKey(passphrase, provider);
|
||||
}
|
||||
}
|
||||
if (privateKey == null) {
|
||||
throw new ProcessException("Secret keyring does not contain the key required to decrypt");
|
||||
}
|
||||
|
||||
InputStream clearData = encData.getDataStream(privateKey, provider);
|
||||
PGPObjectFactory clearFactory = new PGPObjectFactory(clearData);
|
||||
|
||||
obj = clearFactory.nextObject();
|
||||
if (obj instanceof PGPCompressedData) {
|
||||
PGPCompressedData compData = (PGPCompressedData) obj;
|
||||
clearFactory = new PGPObjectFactory(compData.getDataStream());
|
||||
obj = clearFactory.nextObject();
|
||||
}
|
||||
PGPLiteralData literal = (PGPLiteralData) obj;
|
||||
|
||||
InputStream lis = literal.getInputStream();
|
||||
final byte[] buffer = new byte[4096];
|
||||
int len;
|
||||
while ((len = lis.read(buffer)) >= 0) {
|
||||
out.write(buffer, 0, len);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class OpenPGPEncryptCallback implements StreamCallback {
|
||||
|
||||
private String algorithm;
|
||||
private String provider;
|
||||
private String publicKeyring;
|
||||
private String userId;
|
||||
private String filename;
|
||||
|
||||
OpenPGPEncryptCallback(final String algorithm, final String provider, final String keyring, final String userId,
|
||||
final String filename) {
|
||||
this.algorithm = algorithm;
|
||||
this.provider = provider;
|
||||
this.publicKeyring = keyring;
|
||||
this.userId = userId;
|
||||
this.filename = filename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
PGPPublicKey publicKey;
|
||||
try {
|
||||
publicKey = getPublicKey(userId, publicKeyring);
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException("Invalid public keyring - " + e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
|
||||
|
||||
OutputStream output = out;
|
||||
if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) {
|
||||
output = new ArmoredOutputStream(out);
|
||||
}
|
||||
|
||||
PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false,
|
||||
secureRandom, provider);
|
||||
encGenerator.addMethod(publicKey);
|
||||
OutputStream encOut = encGenerator.open(output, new byte[65536]);
|
||||
|
||||
PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED);
|
||||
OutputStream compOut = compData.open(encOut, new byte[65536]);
|
||||
|
||||
PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator();
|
||||
OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]);
|
||||
|
||||
final byte[] buffer = new byte[4096];
|
||||
int len;
|
||||
while ((len = in.read(buffer)) >= 0) {
|
||||
literalOut.write(buffer, 0, len);
|
||||
}
|
||||
|
||||
literalOut.close();
|
||||
compOut.close();
|
||||
encOut.close();
|
||||
output.close();
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Date;
|
||||
import java.util.zip.Deflater;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.apache.nifi.processors.standard.EncryptContent;
|
||||
import org.apache.nifi.processors.standard.EncryptContent.Encryptor;
|
||||
import org.bouncycastle.bcpg.ArmoredOutputStream;
|
||||
import org.bouncycastle.openpgp.PGPCompressedData;
|
||||
import org.bouncycastle.openpgp.PGPCompressedDataGenerator;
|
||||
import org.bouncycastle.openpgp.PGPEncryptedData;
|
||||
import org.bouncycastle.openpgp.PGPEncryptedDataGenerator;
|
||||
import org.bouncycastle.openpgp.PGPEncryptedDataList;
|
||||
import org.bouncycastle.openpgp.PGPLiteralData;
|
||||
import org.bouncycastle.openpgp.PGPLiteralDataGenerator;
|
||||
import org.bouncycastle.openpgp.PGPObjectFactory;
|
||||
import org.bouncycastle.openpgp.PGPPBEEncryptedData;
|
||||
import org.bouncycastle.openpgp.PGPUtil;
|
||||
|
||||
public class OpenPGPPasswordBasedEncryptor implements Encryptor {
|
||||
|
||||
private String algorithm;
|
||||
private String provider;
|
||||
private char[] password;
|
||||
private String filename;
|
||||
|
||||
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
|
||||
|
||||
public OpenPGPPasswordBasedEncryptor(final String algorithm, final String provider, final char[] passphrase, final String filename) {
|
||||
this.algorithm = algorithm;
|
||||
this.provider = provider;
|
||||
this.password = passphrase;
|
||||
this.filename = filename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamCallback getEncryptionCallback() throws Exception {
|
||||
return new OpenPGPEncryptCallback(algorithm, provider, password, filename);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamCallback getDecryptionCallback() throws Exception {
|
||||
return new OpenPGPDecryptCallback(provider, password);
|
||||
}
|
||||
|
||||
private class OpenPGPDecryptCallback implements StreamCallback {
|
||||
|
||||
private String provider;
|
||||
private char[] password;
|
||||
|
||||
OpenPGPDecryptCallback(final String provider, final char[] password) {
|
||||
this.provider = provider;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
InputStream pgpin = PGPUtil.getDecoderStream(in);
|
||||
PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin);
|
||||
|
||||
Object obj = pgpFactory.nextObject();
|
||||
if (!(obj instanceof PGPEncryptedDataList)) {
|
||||
obj = pgpFactory.nextObject();
|
||||
if (!(obj instanceof PGPEncryptedDataList)) {
|
||||
throw new ProcessException("Invalid OpenPGP data");
|
||||
}
|
||||
}
|
||||
PGPEncryptedDataList encList = (PGPEncryptedDataList) obj;
|
||||
|
||||
obj = encList.get(0);
|
||||
if (!(obj instanceof PGPPBEEncryptedData)) {
|
||||
throw new ProcessException("Invalid OpenPGP data");
|
||||
}
|
||||
PGPPBEEncryptedData encData = (PGPPBEEncryptedData) obj;
|
||||
|
||||
try {
|
||||
InputStream clearData = encData.getDataStream(password, provider);
|
||||
PGPObjectFactory clearFactory = new PGPObjectFactory(clearData);
|
||||
|
||||
obj = clearFactory.nextObject();
|
||||
if (obj instanceof PGPCompressedData) {
|
||||
PGPCompressedData compData = (PGPCompressedData) obj;
|
||||
clearFactory = new PGPObjectFactory(compData.getDataStream());
|
||||
obj = clearFactory.nextObject();
|
||||
}
|
||||
PGPLiteralData literal = (PGPLiteralData) obj;
|
||||
|
||||
InputStream lis = literal.getInputStream();
|
||||
final byte[] buffer = new byte[4096];
|
||||
int len;
|
||||
while ((len = lis.read(buffer)) >= 0) {
|
||||
out.write(buffer, 0, len);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class OpenPGPEncryptCallback implements StreamCallback {
|
||||
|
||||
private String algorithm;
|
||||
private String provider;
|
||||
private char[] password;
|
||||
private String filename;
|
||||
|
||||
OpenPGPEncryptCallback(final String algorithm, final String provider, final char[] password, final String filename) {
|
||||
this.algorithm = algorithm;
|
||||
this.provider = provider;
|
||||
this.password = password;
|
||||
this.filename = filename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
try {
|
||||
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
|
||||
|
||||
OutputStream output = out;
|
||||
if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) {
|
||||
output = new ArmoredOutputStream(out);
|
||||
}
|
||||
|
||||
PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false,
|
||||
secureRandom, provider);
|
||||
encGenerator.addMethod(password);
|
||||
OutputStream encOut = encGenerator.open(output, new byte[65536]);
|
||||
|
||||
PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED);
|
||||
OutputStream compOut = compData.open(encOut, new byte[65536]);
|
||||
|
||||
PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator();
|
||||
OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]);
|
||||
|
||||
final byte[] buffer = new byte[4096];
|
||||
int len;
|
||||
while ((len = in.read(buffer)) >= 0) {
|
||||
literalOut.write(buffer, 0, len);
|
||||
}
|
||||
|
||||
literalOut.close();
|
||||
compOut.close();
|
||||
encOut.close();
|
||||
output.close();
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.SecureRandom;
|
||||
|
||||
import javax.crypto.BadPaddingException;
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.IllegalBlockSizeException;
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.crypto.SecretKeyFactory;
|
||||
import javax.crypto.spec.PBEKeySpec;
|
||||
import javax.crypto.spec.PBEParameterSpec;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.apache.nifi.processors.standard.EncryptContent.Encryptor;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
||||
public class PasswordBasedEncryptor implements Encryptor {
|
||||
|
||||
private Cipher cipher;
|
||||
private int saltSize;
|
||||
private SecretKey secretKey;
|
||||
|
||||
public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
|
||||
public static final int DEFAULT_SALT_SIZE = 8;
|
||||
|
||||
public PasswordBasedEncryptor(final String algorithm, final String providerName, final char[] password) {
|
||||
super();
|
||||
try {
|
||||
// initialize cipher
|
||||
this.cipher = Cipher.getInstance(algorithm, providerName);
|
||||
int algorithmBlockSize = cipher.getBlockSize();
|
||||
this.saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE;
|
||||
|
||||
// initialize SecretKey from password
|
||||
PBEKeySpec pbeKeySpec = new PBEKeySpec(password);
|
||||
SecretKeyFactory factory = SecretKeyFactory.getInstance(algorithm, providerName);
|
||||
this.secretKey = factory.generateSecret(pbeKeySpec);
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamCallback getEncryptionCallback() throws ProcessException {
|
||||
try {
|
||||
byte[] salt = new byte[saltSize];
|
||||
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
|
||||
secureRandom.setSeed(System.currentTimeMillis());
|
||||
secureRandom.nextBytes(salt);
|
||||
return new EncryptCallback(salt);
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamCallback getDecryptionCallback() throws ProcessException {
|
||||
return new DecryptCallback();
|
||||
}
|
||||
|
||||
private class DecryptCallback implements StreamCallback {
|
||||
|
||||
public DecryptCallback() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||
final byte[] salt = new byte[saltSize];
|
||||
try {
|
||||
StreamUtils.fillBuffer(in, salt);
|
||||
} catch (final EOFException e) {
|
||||
throw new ProcessException("Cannot decrypt because file size is smaller than salt size", e);
|
||||
}
|
||||
|
||||
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
|
||||
try {
|
||||
cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
|
||||
final byte[] buffer = new byte[65536];
|
||||
int len;
|
||||
while ((len = in.read(buffer)) > 0) {
|
||||
final byte[] decryptedBytes = cipher.update(buffer, 0, len);
|
||||
if (decryptedBytes != null) {
|
||||
out.write(decryptedBytes);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
out.write(cipher.doFinal());
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class EncryptCallback implements StreamCallback {
|
||||
|
||||
private final byte[] salt;
|
||||
|
||||
public EncryptCallback(final byte[] salt) {
|
||||
this.salt = salt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
|
||||
try {
|
||||
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
|
||||
out.write(salt);
|
||||
|
||||
final byte[] buffer = new byte[65536];
|
||||
int len;
|
||||
while ((len = in.read(buffer)) > 0) {
|
||||
final byte[] encryptedBytes = cipher.update(buffer, 0, len);
|
||||
if (encryptedBytes != null) {
|
||||
out.write(encryptedBytes);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
out.write(cipher.doFinal());
|
||||
} catch (final IllegalBlockSizeException | BadPaddingException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8"/>
|
||||
<title>EvaluateJsonPath</title>
|
||||
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<!-- Processor Documentation ================================================== -->
|
||||
<p>
|
||||
<strong>Note:</strong> This processor supports OpenPGP algorithms that are compatible with third party programs.
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -16,16 +16,19 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.processors.standard.EncryptContent;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.security.util.EncryptionMethod;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestEncryptContent {
|
||||
|
@ -62,4 +65,99 @@ public class TestEncryptContent {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecryptSmallerThanSaltSize() {
|
||||
TestRunner runner = TestRunners.newTestRunner(EncryptContent.class);
|
||||
runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
|
||||
runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
|
||||
runner.enqueue(new byte[4]);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPGPDecrypt() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class);
|
||||
runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
|
||||
runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP_ASCII_ARMOR.name());
|
||||
runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc"));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidation() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class);
|
||||
Collection<ValidationResult> results;
|
||||
MockProcessContext pc;
|
||||
|
||||
results = new HashSet<>();
|
||||
runner.enqueue(new byte[0]);
|
||||
pc = (MockProcessContext) runner.getProcessContext();
|
||||
results = pc.validate();
|
||||
Assert.assertEquals(1, results.size());
|
||||
for (ValidationResult vr : results) {
|
||||
Assert.assertTrue(vr.toString()
|
||||
.contains(EncryptContent.PASSWORD.getDisplayName() + " is required when using algorithm"));
|
||||
}
|
||||
|
||||
results = new HashSet<>();
|
||||
runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name());
|
||||
runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/text.txt");
|
||||
runner.enqueue(new byte[0]);
|
||||
pc = (MockProcessContext) runner.getProcessContext();
|
||||
results = pc.validate();
|
||||
Assert.assertEquals(1, results.size());
|
||||
for (ValidationResult vr : results) {
|
||||
Assert.assertTrue(vr.toString().contains(
|
||||
" encryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both "
|
||||
+ EncryptContent.PUBLIC_KEYRING.getDisplayName() + " and "
|
||||
+ EncryptContent.PUBLIC_KEY_USERID.getDisplayName()));
|
||||
}
|
||||
|
||||
results = new HashSet<>();
|
||||
runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID");
|
||||
runner.enqueue(new byte[0]);
|
||||
pc = (MockProcessContext) runner.getProcessContext();
|
||||
results = pc.validate();
|
||||
Assert.assertEquals(1, results.size());
|
||||
for (ValidationResult vr : results) {
|
||||
Assert.assertTrue(vr.toString().contains("does not contain user id USERID"));
|
||||
}
|
||||
|
||||
runner.removeProperty(EncryptContent.PUBLIC_KEYRING);
|
||||
runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID);
|
||||
|
||||
results = new HashSet<>();
|
||||
runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
|
||||
runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/text.txt");
|
||||
runner.enqueue(new byte[0]);
|
||||
pc = (MockProcessContext) runner.getProcessContext();
|
||||
results = pc.validate();
|
||||
Assert.assertEquals(1, results.size());
|
||||
for (ValidationResult vr : results) {
|
||||
Assert.assertTrue(vr.toString().contains(
|
||||
" decryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both "
|
||||
+ EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and "
|
||||
+ EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
|
||||
|
||||
}
|
||||
|
||||
results = new HashSet<>();
|
||||
runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, "PASSWORD");
|
||||
runner.enqueue(new byte[0]);
|
||||
pc = (MockProcessContext) runner.getProcessContext();
|
||||
results = pc.validate();
|
||||
Assert.assertEquals(1, results.size());
|
||||
for (ValidationResult vr : results) {
|
||||
Assert.assertTrue(vr.toString().contains(
|
||||
" could not be opened with the provided " + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
This is some clear text.
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
-----BEGIN PGP MESSAGE-----
|
||||
Version: GnuPG v2
|
||||
|
||||
jA0EAwMC54x6GeEEmLy3ycE/pY5T6vawFMS9/Io8chEQGD8UNAbZFzqXcVEvvy5f
|
||||
wJQggyuF6Lo1RZGnhxdGoz0do1DZzR0lVaTL8dR0/jtz/kRZ1omz+OxICuo9BaRX
|
||||
M+fT2mdna5lhDGJmE7nCctDaGwXqjglEXPqOdi8j/tL225HViTKP1VmlKuu8AjiH
|
||||
lMGuC65bqILSWCaE2jewCbsmjHPgLGmH9NN6EAo2kiFEMOrA+UYo35PuShkgQsZp
|
||||
gA0m2A31JjLW28SUsNd1vLk5bWBZaIFA1UvhR7u0pagv3qtu5f8qls19nHnAT/bz
|
||||
Kh2KrnIm0peWWVEPGQkFoK3Lt9vJTjmHdHPUXQHyg+SMN1PIGA2sxwiSrkQlAyon
|
||||
uNg/I24ctydWU+qndz+ycDWR6zBziA09KHw7uKo5CDtTm+Zo3K9U9uf9y8iZ+AKd
|
||||
vgF/4Nw2lJTqQfNtkmK+N+cKEyZNmJa4r+uDzJF/dCv8R5jGj2dBYRTLxj5tgllU
|
||||
4GiwuJR6w09hK0S0oe9XTdcNciWigb12H9z6U6JOse+1S/fYoUa0CZRJg9Bgeqym
|
||||
uuT/mNSKwRVcWN27vOGy+zGf0tqw6A5idLrK+8FZzd1sgxtKsgkYz8FcZgo9rq1f
|
||||
PHR9KF4JhaGpqNJmEu/GucYIAgq3aeo9GoV/RpDZtoHAVBuqPwcDTzGHeiAoZ49U
|
||||
6Q==
|
||||
=H5nf
|
||||
-----END PGP MESSAGE-----
|
|
@ -191,6 +191,11 @@
|
|||
<artifactId>bcprov-jdk16</artifactId>
|
||||
<version>1.46</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcpg-jdk16</artifactId>
|
||||
<version>1.46</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
|
|
Loading…
Reference in New Issue