NIFI-374: Route ProcessException's to failure

This commit is contained in:
Mark Payne 2015-04-29 17:02:08 -04:00
parent 6e5469c712
commit ab6794b29e
2 changed files with 75 additions and 48 deletions

View File

@ -74,35 +74,35 @@ public class EncryptContent extends AbstractProcessor {
public static final int DEFAULT_SALT_SIZE = 8;
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Specifies whether the content should be encrypted or decrypted")
.required(true)
.allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
.defaultValue(ENCRYPT_MODE)
.build();
.name("Mode")
.description("Specifies whether the content should be encrypted or decrypted")
.required(true)
.allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
.defaultValue(ENCRYPT_MODE)
.build();
public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder()
.name("Encryption Algorithm")
.description("The Encryption Algorithm to use")
.required(true)
.allowableValues(EncryptionMethod.values())
.defaultValue(EncryptionMethod.MD5_256AES.name())
.build();
.name("Encryption Algorithm")
.description("The Encryption Algorithm to use")
.required(true)
.allowableValues(EncryptionMethod.values())
.defaultValue(EncryptionMethod.MD5_256AES.name())
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The Password to use for encrypting or decrypting the data")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
.name("Password")
.description("The Password to use for encrypting or decrypting the data")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully encrypted or decrypted will be routed to success")
.build();
.name("success")
.description("Any FlowFile that is successfully encrypted or decrypted will be routed to success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure")
.build();
.name("failure")
.description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@ -172,34 +172,40 @@ public class EncryptContent extends AbstractProcessor {
final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE;
final StopWatch stopWatch = new StopWatch(true);
if (context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) {
final byte[] salt = new byte[saltSize];
secureRandom.nextBytes(salt);
final String mode = context.getProperty(MODE).getValue();
try {
if (mode.equalsIgnoreCase(ENCRYPT_MODE)) {
final byte[] salt = new byte[saltSize];
secureRandom.nextBytes(salt);
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
try {
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
} catch (final InvalidKeyException | InvalidAlgorithmParameterException e) {
logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000);
try {
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
} catch (final InvalidKeyException | InvalidAlgorithmParameterException e) {
logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.write(flowFile, new EncryptCallback(cipher, salt));
logger.info("Successfully encrypted {}", new Object[]{flowFile});
} else {
if (flowFile.getSize() <= saltSize) {
logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize));
logger.info("successfully decrypted {}", new Object[]{flowFile});
}
flowFile = session.write(flowFile, new EncryptCallback(cipher, salt));
logger.info("Successfully encrypted {}", new Object[]{flowFile});
} else {
if (flowFile.getSize() <= saltSize) {
logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize));
logger.info("successfully decrypted {}", new Object[]{flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final ProcessException pe) {
getLogger().error("Failed to {} {} due to {}; routing to failure", new Object[] {mode, flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
}
private static class DecryptCallback implements StreamCallback {

View File

@ -61,4 +61,25 @@ public class TestEncryptContent {
}
}
@Test
public void testDecryptNonEncryptedFile() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent());
testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
for (final EncryptionMethod method : EncryptionMethod.values()) {
if (method.isUnlimitedStrength()) {
continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default.
}
testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name());
testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.clearTransferState();
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
}
}
}