From c526656a228389aa972ce7ebc1df037333000516 Mon Sep 17 00:00:00 2001 From: James Wing Date: Mon, 11 Jan 2016 13:39:15 -0800 Subject: [PATCH 01/10] NIFI-1283 Fixing ControllerStatusReportingTask loggers to use fully-qualified class name --- .../apache/nifi/controller/ControllerStatusReportingTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java index 5b95094828..4c9b4b1306 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java @@ -53,8 +53,8 @@ public class ControllerStatusReportingTask extends AbstractReportingTask { .defaultValue("true") .build(); - private static final Logger processorLogger = LoggerFactory.getLogger(ControllerStatusReportingTask.class.getSimpleName() + ".Processors"); - private static final Logger connectionLogger = LoggerFactory.getLogger(ControllerStatusReportingTask.class.getSimpleName() + ".Connections"); + private static final Logger processorLogger = LoggerFactory.getLogger(ControllerStatusReportingTask.class + ".Processors"); + private static final Logger connectionLogger = LoggerFactory.getLogger(ControllerStatusReportingTask.class + ".Connections"); private static final String PROCESSOR_LINE_FORMAT_NO_DELTA = "| %1$-30.30s | %2$-36.36s | %3$-24.24s | %4$10.10s | %5$19.19s | %6$19.19s | %7$12.12s | %8$13.13s | %9$5.5s | %10$12.12s |\n"; private static final String PROCESSOR_LINE_FORMAT_WITH_DELTA = "| %1$-30.30s | %2$-36.36s | %3$-24.24s | %4$10.10s | %5$43.43s | %6$43.43s | %7$28.28s | %8$30.30s | %9$14.14s | %10$28.28s |\n"; From 6b54753dbb9bf6b2694a0cee7ac485fdcc8c3d01 Mon Sep 17 00:00:00 2001 From: jpercivall Date: Mon, 21 Dec 2015 16:05:13 -0500 Subject: [PATCH 02/10] NIFI-1316 adding option to DetectDuplicate to not cache the entry identifier Signed-off-by: Aldrin Piri --- .../processors/standard/DetectDuplicate.java | 33 +++++++- .../standard/TestDetectDuplicate.java | 78 ++++++++++++++++++- 2 files changed, 104 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java index 39dc725c92..71195d9dbd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java @@ -56,7 +56,7 @@ import org.apache.nifi.processor.util.StandardValidators; @Tags({"hash", "dupe", "duplicate", "dedupe"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. " - + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's" + + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's " + "\"description\", which is specified in the property. If the FlowFile is not determined to be a duplicate, the Processor " + "routes the FlowFile to 'non-duplicate'") @WritesAttribute(attribute = "original.flowfile.description", description = "All FlowFiles routed to the duplicate relationship will have " @@ -101,6 +101,16 @@ public class DetectDuplicate extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); + public static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder() + .name("Cache The Entry Identifier") + .description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, " + + "the processor would only check for duplicates and not cache the Entry Identifier, requiring another " + + "processor to add identifiers to the distributed cache.") + .required(false) + .allowableValues("true","false") + .defaultValue("true") + .build(); + public static final Relationship REL_DUPLICATE = new Relationship.Builder() .name("duplicate") .description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship") @@ -134,6 +144,7 @@ public class DetectDuplicate extends AbstractProcessor { descriptors.add(FLOWFILE_DESCRIPTION); descriptors.add(AGE_OFF_DURATION); descriptors.add(DISTRIBUTED_CACHE_SERVICE); + descriptors.add(CACHE_IDENTIFIER); return descriptors; } @@ -164,14 +175,28 @@ public class DetectDuplicate extends AbstractProcessor { try { final String flowFileDescription = context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue(); final CacheValue cacheValue = new CacheValue(flowFileDescription, now); - final CacheValue originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer); + final CacheValue originalCacheValue; + + final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean(); + if (shouldCacheIdentifier) { + originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer); + } else { + originalCacheValue = cache.get(cacheKey, keySerializer, valueDeserializer); + } + boolean duplicate = originalCacheValue != null; if (duplicate && durationMS != null && (now >= originalCacheValue.getEntryTimeMS() + durationMS)) { boolean status = cache.remove(cacheKey, keySerializer); logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status}); - // this should typically result in duplicate being false...but, better safe than sorry - duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer); + + // both should typically result in duplicate being false...but, better safe than sorry + if (shouldCacheIdentifier) { + duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer); + } else { + duplicate = cache.containsKey(cacheKey, keySerializer); + } } + if (duplicate) { session.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate of: " + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME); String originalFlowFileDescription = originalCacheValue.getDescription(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 12a5cd4ad9..54e6a29594 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -73,7 +73,6 @@ public class TestDetectDuplicate { runner.run(); runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); runner.clearTransferState(); - client.exists = true; runner.enqueue(new byte[]{}, props); runner.run(); runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); @@ -101,7 +100,6 @@ public class TestDetectDuplicate { runner.run(); runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); runner.clearTransferState(); - client.exists = true; Thread.sleep(3000); runner.enqueue(new byte[]{}, props); runner.run(); @@ -120,6 +118,72 @@ public class TestDetectDuplicate { return client; } + @Test + public void testDuplicateNoCache() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final DistributedMapCacheClientImpl client = createClient(); + final Map clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + runner.addControllerService("client", client, clientProperties); + runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); + runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); + runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours"); + runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false"); + final Map props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); + runner.enableControllerService(client); + + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.clearTransferState(); + + runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "true"); + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + runner.clearTransferState(); + + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + } + + @Test + public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException { + + final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final DistributedMapCacheClientImpl client = createClient(); + final Map clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + runner.addControllerService("client", client, clientProperties); + runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); + runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); + runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs"); + runner.enableControllerService(client); + + final Map props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); + + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + + runner.clearTransferState(); + Thread.sleep(3000); + + runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false"); + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + } + static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { boolean exists = false; @@ -150,6 +214,7 @@ public class TestDetectDuplicate { } cacheValue = value; + exists = true; return true; } @@ -160,6 +225,7 @@ public class TestDetectDuplicate { return (V) cacheValue; } cacheValue = value; + exists = true; return null; } @@ -170,7 +236,11 @@ public class TestDetectDuplicate { @Override public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { - return null; + if (exists) { + return (V) cacheValue; + } else { + return null; + } } @Override @@ -181,6 +251,8 @@ public class TestDetectDuplicate { @Override public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + cacheValue = value; + exists = true; } } From ffbfffce6dd381ffdac5704ef2279f1fe5345e89 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Tue, 22 Dec 2015 19:03:09 -0800 Subject: [PATCH 03/10] NIFI-1324: Changed Maven dependencies for BouncyCastle bcprov and bcpg from jdk16:1.46 to jdk15on:1.53 (kept nifi-web-security on jdk16:1.46 because jdk15on:1.53 splits OCSP logic into new module bcpkix). Added individual unit tests for PGP public keyring validation. Passes all legacy unit tests. Added TODOs for customizable brick encryption and refactoring shared code. Cleaned up magic numbers to constants. Added unit tests for OpenPGPPasswordBasedEncryptor (internal consistency and legacy file decrypt). Began refactoring shared encrypt code from OpenPGP* implementations. Extracted encrypt utility method from OpenPGPPasswordBasedEncryptor to PGPUtil class. Added test resources (signed and unsigned key-encrypted files). Added unit tests for OpenPGPKeyBasedEncryptor (internal consistency and external file decrypt). Changed BC dependency for nifi-web-security to bcprov-jdk15on:1.53 and bcpkix-jdk15on:1.53. Updated OCSPValidator to use new BC logic for OCSP validation. This code compiles but should be fully audited, as the legacy OCSP validation was not completely implemented. Added skeleton of OCSP validator unit tests with successful keypair and certificate generation and signing code. Added further unit tests for issued certificates. Annotated unimplemented unit tests with note about Groovy integration. Refactored Jersey call in OCSPCertificateValidator to internal method. Added toString() to NiFi local OcspRequest. Implemented positive & negative unit tests with cache injection for valid/revoked OCSP certificate. Resolved contrib-check issues. Removed ignored code in unit test. Signed-off-by: Matt Gilman --- .../nifi-framework-core/pom.xml | 2 +- .../nifi-web/nifi-web-security/pom.xml | 6 +- .../x509/ocsp/OcspCertificateValidator.java | 125 ++++--- .../web/security/x509/ocsp/OcspRequest.java | 9 + .../ocsp/OcspCertificateValidatorTest.java | 311 ++++++++++++++++++ .../nifi-standard-processors/pom.xml | 4 +- .../processors/standard/EncryptContent.java | 20 +- .../util/OpenPGPKeyBasedEncryptor.java | 272 ++++++++++----- .../util/OpenPGPPasswordBasedEncryptor.java | 111 +++---- .../processors/standard/util/PGPUtil.java | 89 +++++ .../standard/TestEncryptContent.java | 147 ++++++++- .../util/OpenPGPKeyBasedEncryptorTest.java | 129 ++++++++ .../OpenPGPPasswordBasedEncryptorTest.java | 123 +++++++ .../resources/TestEncryptContent/pubring.gpg | Bin 0 -> 1250 bytes .../resources/TestEncryptContent/secring.gpg | Bin 0 -> 2628 bytes .../resources/TestEncryptContent/text.txt.gpg | Bin 0 -> 1481 bytes .../TestEncryptContent/text.txt.unsigned.gpg | Bin 0 -> 824 bytes pom.xml | 13 +- 18 files changed, 1128 insertions(+), 233 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/test/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidatorTest.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptorTest.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/pubring.gpg create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/secring.gpg create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/text.txt.gpg create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/text.txt.unsigned.gpg diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 783235a317..dc694ebb72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -85,7 +85,7 @@ org.bouncycastle - bcprov-jdk16 + bcprov-jdk15on com.sun.jersey diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml index d93e58eb69..5496f15bb9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml @@ -84,7 +84,11 @@ org.bouncycastle - bcprov-jdk16 + bcprov-jdk15on + + + org.bouncycastle + bcpkix-jdk15on com.google.guava diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidator.java index 832a63c36d..b0762b5c12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidator.java @@ -27,40 +27,51 @@ import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.security.x509.ocsp.OcspStatus.ValidationStatus; +import org.apache.nifi.web.security.x509.ocsp.OcspStatus.VerificationStatus; +import org.apache.nifi.web.util.WebUtils; +import org.bouncycastle.asn1.DEROctetString; +import org.bouncycastle.asn1.ocsp.OCSPObjectIdentifiers; +import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.Extensions; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.ocsp.BasicOCSPResp; +import org.bouncycastle.cert.ocsp.CertificateID; +import org.bouncycastle.cert.ocsp.CertificateStatus; +import org.bouncycastle.cert.ocsp.OCSPException; +import org.bouncycastle.cert.ocsp.OCSPReq; +import org.bouncycastle.cert.ocsp.OCSPReqBuilder; +import org.bouncycastle.cert.ocsp.OCSPResp; +import org.bouncycastle.cert.ocsp.OCSPRespBuilder; +import org.bouncycastle.cert.ocsp.RevokedStatus; +import org.bouncycastle.cert.ocsp.SingleResp; +import org.bouncycastle.operator.DigestCalculatorProvider; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentVerifierProviderBuilder; +import org.bouncycastle.operator.jcajce.JcaDigestCalculatorProviderBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; +import javax.security.auth.x500.X500Principal; import java.io.FileInputStream; import java.io.IOException; import java.math.BigInteger; import java.net.URI; import java.security.KeyStore; -import java.security.NoSuchProviderException; +import java.security.cert.CertificateException; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509TrustManager; -import javax.security.auth.x500.X500Principal; -import org.apache.nifi.framework.security.util.SslContextFactory; -import org.apache.nifi.web.security.x509.ocsp.OcspStatus.ValidationStatus; -import org.apache.nifi.web.security.x509.ocsp.OcspStatus.VerificationStatus; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.util.WebUtils; -import org.apache.commons.lang3.StringUtils; -import org.bouncycastle.ocsp.BasicOCSPResp; -import org.bouncycastle.ocsp.CertificateID; -import org.bouncycastle.ocsp.CertificateStatus; -import org.bouncycastle.ocsp.OCSPException; -import org.bouncycastle.ocsp.OCSPReq; -import org.bouncycastle.ocsp.OCSPReqGenerator; -import org.bouncycastle.ocsp.OCSPResp; -import org.bouncycastle.ocsp.OCSPRespStatus; -import org.bouncycastle.ocsp.RevokedStatus; -import org.bouncycastle.ocsp.SingleResp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class OcspCertificateValidator { @@ -89,7 +100,7 @@ public class OcspCertificateValidator { // attempt to parse the specified va url validationAuthorityURI = URI.create(rawValidationAuthorityUrl); - // connection detials + // connection details final ClientConfig config = new DefaultClientConfig(); config.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, READ_TIMEOUT); config.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, CONNECT_TIMEOUT); @@ -283,16 +294,24 @@ public class OcspCertificateValidator { try { // prepare the request final BigInteger subjectSerialNumber = subjectCertificate.getSerialNumber(); - final CertificateID certificateId = new CertificateID(CertificateID.HASH_SHA1, issuerCertificate, subjectSerialNumber); + final DigestCalculatorProvider calculatorProviderBuilder = new JcaDigestCalculatorProviderBuilder().setProvider("BC").build(); + final CertificateID certificateId = new CertificateID(calculatorProviderBuilder.get(CertificateID.HASH_SHA1), + new X509CertificateHolder(issuerCertificate.getEncoded()), + subjectSerialNumber); // generate the request - final OCSPReqGenerator requestGenerator = new OCSPReqGenerator(); + final OCSPReqBuilder requestGenerator = new OCSPReqBuilder(); requestGenerator.addRequest(certificateId); - final OCSPReq ocspRequest = requestGenerator.generate(); + + // Create a nonce to avoid replay attack + BigInteger nonce = BigInteger.valueOf(System.currentTimeMillis()); + Extension ext = new Extension(OCSPObjectIdentifiers.id_pkix_ocsp_nonce, true, new DEROctetString(nonce.toByteArray())); + requestGenerator.setRequestExtensions(new Extensions(new Extension[]{ext})); + + final OCSPReq ocspRequest = requestGenerator.build(); // perform the request - final WebResource resource = client.resource(validationAuthorityURI); - final ClientResponse response = resource.header(CONTENT_TYPE_HEADER, OCSP_REQUEST_CONTENT_TYPE).post(ClientResponse.class, ocspRequest.getEncoded()); + final ClientResponse response = getClientResponse(ocspRequest); // ensure the request was completed successfully if (ClientResponse.Status.OK.getStatusCode() != response.getStatusInfo().getStatusCode()) { @@ -305,22 +324,22 @@ public class OcspCertificateValidator { // verify the response status switch (ocspResponse.getStatus()) { - case OCSPRespStatus.SUCCESSFUL: + case OCSPRespBuilder.SUCCESSFUL: ocspStatus.setResponseStatus(OcspStatus.ResponseStatus.Successful); break; - case OCSPRespStatus.INTERNAL_ERROR: + case OCSPRespBuilder.INTERNAL_ERROR: ocspStatus.setResponseStatus(OcspStatus.ResponseStatus.InternalError); break; - case OCSPRespStatus.MALFORMED_REQUEST: + case OCSPRespBuilder.MALFORMED_REQUEST: ocspStatus.setResponseStatus(OcspStatus.ResponseStatus.MalformedRequest); break; - case OCSPRespStatus.SIGREQUIRED: + case OCSPRespBuilder.SIG_REQUIRED: ocspStatus.setResponseStatus(OcspStatus.ResponseStatus.SignatureRequired); break; - case OCSPRespStatus.TRY_LATER: + case OCSPRespBuilder.TRY_LATER: ocspStatus.setResponseStatus(OcspStatus.ResponseStatus.TryLater); break; - case OCSPRespStatus.UNAUTHORIZED: + case OCSPRespBuilder.UNAUTHORIZED: ocspStatus.setResponseStatus(OcspStatus.ResponseStatus.Unauthorized); break; default: @@ -329,7 +348,7 @@ public class OcspCertificateValidator { } // only proceed if the response was successful - if (ocspResponse.getStatus() != OCSPRespStatus.SUCCESSFUL) { + if (ocspResponse.getStatus() != OCSPRespBuilder.SUCCESSFUL) { logger.warn(String.format("OCSP request was unsuccessful (%s).", ocspStatus.getResponseStatus().toString())); return ocspStatus; } @@ -337,7 +356,7 @@ public class OcspCertificateValidator { // ensure the appropriate response object final Object ocspResponseObject = ocspResponse.getResponseObject(); if (ocspResponseObject == null || !(ocspResponseObject instanceof BasicOCSPResp)) { - logger.warn(String.format("Unexcepted OCSP response object: %s", ocspResponseObject)); + logger.warn(String.format("Unexpected OCSP response object: %s", ocspResponseObject)); return ocspStatus; } @@ -345,9 +364,9 @@ public class OcspCertificateValidator { final BasicOCSPResp basicOcspResponse = (BasicOCSPResp) ocspResponse.getResponseObject(); // attempt to locate the responder certificate - final X509Certificate[] responderCertificates = basicOcspResponse.getCerts(null); + final X509CertificateHolder[] responderCertificates = basicOcspResponse.getCerts(); if (responderCertificates.length != 1) { - logger.warn(String.format("Unexcepted number of OCSP responder certificates: %s", responderCertificates.length)); + logger.warn(String.format("Unexpected number of OCSP responder certificates: %s", responderCertificates.length)); return ocspStatus; } @@ -355,7 +374,7 @@ public class OcspCertificateValidator { final X509Certificate trustedResponderCertificate = getTrustedResponderCertificate(responderCertificates[0], issuerCertificate); if (trustedResponderCertificate != null) { // verify the response - if (basicOcspResponse.verify(trustedResponderCertificate.getPublicKey(), null)) { + if (basicOcspResponse.isSignatureValid(new JcaContentVerifierProviderBuilder().setProvider("BC").build(trustedResponderCertificate.getPublicKey()))) { ocspStatus.setVerificationStatus(VerificationStatus.Verified); } else { ocspStatus.setVerificationStatus(VerificationStatus.Unverified); @@ -383,30 +402,40 @@ public class OcspCertificateValidator { } } } - } catch (final OCSPException | IOException | UniformInterfaceException | ClientHandlerException | NoSuchProviderException e) { + } catch (final OCSPException | IOException | UniformInterfaceException | ClientHandlerException | OperatorCreationException e) { logger.error(e.getMessage(), e); + } catch (CertificateException e) { + e.printStackTrace(); } return ocspStatus; } + private ClientResponse getClientResponse(OCSPReq ocspRequest) throws IOException { + final WebResource resource = client.resource(validationAuthorityURI); + return resource.header(CONTENT_TYPE_HEADER, OCSP_REQUEST_CONTENT_TYPE).post(ClientResponse.class, ocspRequest.getEncoded()); + } + /** * Gets the trusted responder certificate. The response contains the responder certificate, however we cannot blindly trust it. Instead, we use a configured trusted CA. If the responder * certificate is a trusted CA, then we can use it. If the responder certificate is not directly trusted, we still may be able to trust it if it was issued by the same CA that issued the subject * certificate. Other various checks may be required (this portion is currently not implemented). * - * @param responderCertificate cert - * @param issuerCertificate cert + * @param responderCertificateHolder cert + * @param issuerCertificate cert * @return cert */ - private X509Certificate getTrustedResponderCertificate(final X509Certificate responderCertificate, final X509Certificate issuerCertificate) { + private X509Certificate getTrustedResponderCertificate(final X509CertificateHolder responderCertificateHolder, final X509Certificate issuerCertificate) throws CertificateException { // look for the responder's certificate specifically - if (trustedCAs.containsKey(responderCertificate.getSubjectX500Principal().getName())) { - return trustedCAs.get(responderCertificate.getSubjectX500Principal().getName()); + final X509Certificate responderCertificate = new JcaX509CertificateConverter().setProvider("BC").getCertificate(responderCertificateHolder); + final String trustedCAName = responderCertificate.getSubjectX500Principal().getName(); + if (trustedCAs.containsKey(trustedCAName)) { + return trustedCAs.get(trustedCAName); } // if the responder certificate was issued by the same CA that issued the subject certificate we may be able to use that... - if (responderCertificate.getIssuerX500Principal().equals(issuerCertificate.getSubjectX500Principal())) { + final X500Principal issuerCA = issuerCertificate.getSubjectX500Principal(); + if (responderCertificate.getIssuerX500Principal().equals(issuerCA)) { // perform a number of verification steps... TODO... from sun.security.provider.certpath.OCSPResponse.java... currently incomplete... // try { // // ensure appropriate key usage diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspRequest.java index 5318c133ef..1f69689b6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/x509/ocsp/OcspRequest.java @@ -66,4 +66,13 @@ public class OcspRequest { return true; } + @Override + public String toString() { + return new StringBuilder("NiFi OCSP Request: ") + .append("Subject DN: ").append(subjectCertificate != null ? subjectCertificate.getSubjectDN().getName() : "") + .append(" issued by ") + .append("Issuer DN: ").append(issuerCertificate != null ? issuerCertificate.getSubjectDN().getName() : "").toString(); + + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/test/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidatorTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/test/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidatorTest.java new file mode 100644 index 0000000000..7d9c542dd3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/test/java/org/apache/nifi/web/security/x509/ocsp/OcspCertificateValidatorTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.security.x509.ocsp; + +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.ExtendedKeyUsage; +import org.bouncycastle.asn1.x509.KeyPurposeId; +import org.bouncycastle.asn1.x509.KeyUsage; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.asn1.x509.X509Extension; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.security.InvalidKeyException; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.Security; +import java.security.SignatureException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Date; +import java.util.Vector; + +public class OcspCertificateValidatorTest { + private static final Logger logger = LoggerFactory.getLogger(OcspCertificateValidatorTest.class); + + private static final int KEY_SIZE = 2048; + + private static final long YESTERDAY = System.currentTimeMillis() - 24 * 60 * 60 * 1000; + private static final long ONE_YEAR_FROM_NOW = System.currentTimeMillis() + 365 * 24 * 60 * 60 * 1000; + private static final String SIGNATURE_ALGORITHM = "SHA256withRSA"; + private static final String PROVIDER = "BC"; + + private static final String ISSUER_DN = "CN=NiFi Test CA,OU=Security,O=Apache,ST=CA,C=US"; + + private static X509Certificate ISSUER_CERTIFICATE; + + @BeforeClass + public static void setUpOnce() throws Exception { + Security.addProvider(new BouncyCastleProvider()); + +// ISSUER_CERTIFICATE = generateCertificate(ISSUER_DN); + } + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + + } + + /** + * Generates a public/private RSA keypair using the default key size. + * + * @return the keypair + * @throws NoSuchAlgorithmException if the RSA algorithm is not available + */ + private static KeyPair generateKeyPair() throws NoSuchAlgorithmException { + KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); + keyPairGenerator.initialize(KEY_SIZE); + return keyPairGenerator.generateKeyPair(); + } + + /** + * Generates a signed certificate using an on-demand keypair. + * + * @param dn the DN + * @return the certificate + * @throws IOException if an exception occurs + * @throws NoSuchAlgorithmException if an exception occurs + * @throws CertificateException if an exception occurs + * @throws NoSuchProviderException if an exception occurs + * @throws SignatureException if an exception occurs + * @throws InvalidKeyException if an exception occurs + * @throws OperatorCreationException if an exception occurs + */ + private static X509Certificate generateCertificate(String dn) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, + InvalidKeyException, OperatorCreationException { + KeyPair keyPair = generateKeyPair(); + return generateCertificate(dn, keyPair); + } + + /** + * Generates a signed certificate with a specific keypair. + * + * @param dn the DN + * @param keyPair the public key will be included in the certificate and the the private key is used to sign the certificate + * @return the certificate + * @throws IOException if an exception occurs + * @throws NoSuchAlgorithmException if an exception occurs + * @throws CertificateException if an exception occurs + * @throws NoSuchProviderException if an exception occurs + * @throws SignatureException if an exception occurs + * @throws InvalidKeyException if an exception occurs + * @throws OperatorCreationException if an exception occurs + */ + private static X509Certificate generateCertificate(String dn, KeyPair keyPair) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, + InvalidKeyException, OperatorCreationException { + PrivateKey privateKey = keyPair.getPrivate(); + ContentSigner sigGen = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).setProvider(PROVIDER).build(privateKey); + SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); + Date startDate = new Date(YESTERDAY); + Date endDate = new Date(ONE_YEAR_FROM_NOW); + + X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder( + new X500Name(dn), + BigInteger.valueOf(System.currentTimeMillis()), + startDate, endDate, + new X500Name(dn), + subPubKeyInfo); + + // Set certificate extensions + // (1) digitalSignature extension + certBuilder.addExtension(X509Extension.keyUsage, true, + new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyEncipherment | KeyUsage.dataEncipherment | KeyUsage.keyAgreement)); + + // (2) extendedKeyUsage extension + Vector ekUsages = new Vector<>(); + ekUsages.add(KeyPurposeId.id_kp_clientAuth); + ekUsages.add(KeyPurposeId.id_kp_serverAuth); + certBuilder.addExtension(X509Extension.extendedKeyUsage, false, new ExtendedKeyUsage(ekUsages)); + + // Sign the certificate + X509CertificateHolder certificateHolder = certBuilder.build(sigGen); + return new JcaX509CertificateConverter().setProvider(PROVIDER) + .getCertificate(certificateHolder); + } + + /** + * Generates a certificate signed by the issuer key. + * + * @param dn the subject DN + * @param issuerDn the issuer DN + * @param issuerKey the issuer private key + * @return the certificate + * @throws IOException if an exception occurs + * @throws NoSuchAlgorithmException if an exception occurs + * @throws CertificateException if an exception occurs + * @throws NoSuchProviderException if an exception occurs + * @throws SignatureException if an exception occurs + * @throws InvalidKeyException if an exception occurs + * @throws OperatorCreationException if an exception occurs + */ + private static X509Certificate generateIssuedCertificate(String dn, String issuerDn, PrivateKey issuerKey) throws IOException, NoSuchAlgorithmException, CertificateException, + NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException { + KeyPair keyPair = generateKeyPair(); + return generateIssuedCertificate(dn, keyPair.getPublic(), issuerDn, issuerKey); + } + + /** + * Generates a certificate with a specific public key signed by the issuer key. + * + * @param dn the subject DN + * @param publicKey the subject public key + * @param issuerDn the issuer DN + * @param issuerKey the issuer private key + * @return the certificate + * @throws IOException if an exception occurs + * @throws NoSuchAlgorithmException if an exception occurs + * @throws CertificateException if an exception occurs + * @throws NoSuchProviderException if an exception occurs + * @throws SignatureException if an exception occurs + * @throws InvalidKeyException if an exception occurs + * @throws OperatorCreationException if an exception occurs + */ + private static X509Certificate generateIssuedCertificate(String dn, PublicKey publicKey, String issuerDn, PrivateKey issuerKey) throws IOException, NoSuchAlgorithmException, + CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException { + ContentSigner sigGen = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).setProvider(PROVIDER).build(issuerKey); + SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(publicKey.getEncoded()); + Date startDate = new Date(YESTERDAY); + Date endDate = new Date(ONE_YEAR_FROM_NOW); + + X509v3CertificateBuilder v3CertGen = new X509v3CertificateBuilder( + new X500Name(issuerDn), + BigInteger.valueOf(System.currentTimeMillis()), + startDate, endDate, + new X500Name(dn), + subPubKeyInfo); + + X509CertificateHolder certificateHolder = v3CertGen.build(sigGen); + return new JcaX509CertificateConverter().setProvider(PROVIDER) + .getCertificate(certificateHolder); + } + + @Test + public void testShouldGenerateCertificate() throws Exception { + // Arrange + final String testDn = "CN=This is a test"; + + // Act + X509Certificate certificate = generateCertificate(testDn); + logger.info("Generated certificate: \n{}", certificate); + + // Assert + assert certificate.getSubjectDN().getName().equals(testDn); + assert certificate.getIssuerDN().getName().equals(testDn); + certificate.verify(certificate.getPublicKey()); + } + + @Test + public void testShouldGenerateCertificateFromKeyPair() throws Exception { + // Arrange + final String testDn = "CN=This is a test"; + final KeyPair keyPair = generateKeyPair(); + + // Act + X509Certificate certificate = generateCertificate(testDn, keyPair); + logger.info("Generated certificate: \n{}", certificate); + + // Assert + assert certificate.getPublicKey().equals(keyPair.getPublic()); + assert certificate.getSubjectDN().getName().equals(testDn); + assert certificate.getIssuerDN().getName().equals(testDn); + certificate.verify(certificate.getPublicKey()); + } + + @Test + public void testShouldGenerateIssuedCertificate() throws Exception { + // Arrange + final String testDn = "CN=This is a signed test"; + final String issuerDn = "CN=Issuer CA"; + final KeyPair issuerKeyPair = generateKeyPair(); + final PrivateKey issuerPrivateKey = issuerKeyPair.getPrivate(); + + final X509Certificate issuerCertificate = generateCertificate(issuerDn, issuerKeyPair); + logger.info("Generated issuer certificate: \n{}", issuerCertificate); + + // Act + X509Certificate certificate = generateIssuedCertificate(testDn, issuerDn, issuerPrivateKey); + logger.info("Generated signed certificate: \n{}", certificate); + + // Assert + assert issuerCertificate.getPublicKey().equals(issuerKeyPair.getPublic()); + assert certificate.getSubjectX500Principal().getName().equals(testDn); + assert certificate.getIssuerX500Principal().getName().equals(issuerDn); + certificate.verify(issuerCertificate.getPublicKey()); + + try { + certificate.verify(certificate.getPublicKey()); + Assert.fail("Should have thrown exception"); + } catch (Exception e) { + assert e instanceof SignatureException; + assert e.getMessage().contains("certificate does not verify with supplied key"); + } + } + + @Ignore("To be implemented with Groovy test") + @Test + public void testShouldValidateCertificate() throws Exception { + + } + + @Ignore("To be implemented with Groovy test") + @Test + public void testShouldNotValidateEmptyCertificate() throws Exception { + + } + + @Ignore("To be implemented with Groovy test") + @Test + public void testShouldNotValidateInvalidCertificate() throws Exception { + + } + + @Ignore("To be implemented with Groovy test") + @Test + public void testValidateShouldHandleUnsignedResponse() throws Exception { + + } + + @Ignore("To be implemented with Groovy test") + @Test + public void testValidateShouldHandleResponseWithIncorrectNonce() throws Exception { + + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index e189027e76..5ec2cf59b1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -74,11 +74,11 @@ language governing permissions and limitations under the License. --> org.bouncycastle - bcprov-jdk16 + bcprov-jdk15on org.bouncycastle - bcpg-jdk16 + bcpg-jdk15on commons-codec diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index 50397c8544..8cad3cb725 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.security.Security; -import java.text.Normalizer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -56,6 +46,16 @@ import org.apache.nifi.security.util.KeyDerivationFunction; import org.apache.nifi.util.StopWatch; import org.bouncycastle.jce.provider.BouncyCastleProvider; +import java.security.Security; +import java.text.Normalizer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + @EventDriven @SideEffectFree @SupportsBatching diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java index 1931c720a6..b97e416101 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.processors.standard.util; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.NoSuchProviderException; -import java.security.SecureRandom; -import java.util.Date; -import java.util.Iterator; -import java.util.zip.Deflater; - import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processors.standard.EncryptContent; @@ -41,6 +30,7 @@ import org.bouncycastle.openpgp.PGPException; import org.bouncycastle.openpgp.PGPLiteralData; import org.bouncycastle.openpgp.PGPLiteralDataGenerator; import org.bouncycastle.openpgp.PGPObjectFactory; +import org.bouncycastle.openpgp.PGPOnePassSignatureList; import org.bouncycastle.openpgp.PGPPrivateKey; import org.bouncycastle.openpgp.PGPPublicKey; import org.bouncycastle.openpgp.PGPPublicKeyEncryptedData; @@ -50,18 +40,41 @@ import org.bouncycastle.openpgp.PGPSecretKey; import org.bouncycastle.openpgp.PGPSecretKeyRing; import org.bouncycastle.openpgp.PGPSecretKeyRingCollection; import org.bouncycastle.openpgp.PGPUtil; +import org.bouncycastle.openpgp.jcajce.JcaPGPObjectFactory; +import org.bouncycastle.openpgp.operator.PBESecretKeyDecryptor; +import org.bouncycastle.openpgp.operator.PublicKeyDataDecryptorFactory; +import org.bouncycastle.openpgp.operator.bc.BcKeyFingerprintCalculator; +import org.bouncycastle.openpgp.operator.jcajce.JcePBESecretKeyDecryptorBuilder; +import org.bouncycastle.openpgp.operator.jcajce.JcePGPDataEncryptorBuilder; +import org.bouncycastle.openpgp.operator.jcajce.JcePublicKeyDataDecryptorFactoryBuilder; +import org.bouncycastle.openpgp.operator.jcajce.JcePublicKeyKeyEncryptionMethodGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; +import java.util.Date; +import java.util.Iterator; +import java.util.zip.Deflater; + +import static org.apache.nifi.processors.standard.util.PGPUtil.BLOCK_SIZE; +import static org.apache.nifi.processors.standard.util.PGPUtil.BUFFER_SIZE; public class OpenPGPKeyBasedEncryptor implements Encryptor { + private static final Logger logger = LoggerFactory.getLogger(OpenPGPPasswordBasedEncryptor.class); private String algorithm; private String provider; + // TODO: This can hold either the secret or public keyring path private String keyring; private String userId; private char[] passphrase; private String filename; - public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; - public OpenPGPKeyBasedEncryptor(final String algorithm, final String provider, final String keyring, final String userId, final char[] passphrase, final String filename) { this.algorithm = algorithm; this.provider = provider; @@ -81,75 +94,137 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { return new OpenPGPDecryptCallback(provider, keyring, passphrase); } - /* - * Validate secret keyring passphrase + /** + * Returns true if the passphrase is valid. + *

+ * This is used in the EncryptContent custom validation to check if the passphrase can extract a private key from the secret key ring. After BC was upgraded from 1.46 to 1.53, the API changed so this is performed differently but the functionality is equivalent. + * + * @param provider the provider name + * @param secretKeyringFile the file path to the keyring + * @param passphrase the passphrase + * @return true if the passphrase can successfully extract any private key + * @throws IOException if there is a problem reading the keyring file + * @throws PGPException if there is a problem parsing/extracting the private key + * @throws NoSuchProviderException if the provider is not available */ public static boolean validateKeyring(String provider, String secretKeyringFile, char[] passphrase) throws IOException, PGPException, NoSuchProviderException { - try (InputStream fin = Files.newInputStream(Paths.get(secretKeyringFile)); InputStream pin = PGPUtil.getDecoderStream(fin)) { - PGPSecretKeyRingCollection pgpsec = new PGPSecretKeyRingCollection(pin); - Iterator ringit = pgpsec.getKeyRings(); - while (ringit.hasNext()) { - PGPSecretKeyRing secretkeyring = (PGPSecretKeyRing) ringit.next(); - PGPSecretKey secretkey = secretkeyring.getSecretKey(); - secretkey.extractPrivateKey(passphrase, provider); - return true; - } + try { + getDecryptedPrivateKey(provider, secretKeyringFile, passphrase); + return true; + } catch (Exception e) { + // If this point is reached, no private key could be extracted with the given passphrase return false; } + } + private static PGPPrivateKey getDecryptedPrivateKey(String provider, String secretKeyringFile, char[] passphrase) throws IOException, PGPException { + // TODO: Verify that key IDs cannot be 0 + return getDecryptedPrivateKey(provider, secretKeyringFile, 0L, passphrase); + } + + private static PGPPrivateKey getDecryptedPrivateKey(String provider, String secretKeyringFile, long keyId, char[] passphrase) throws IOException, PGPException { + // TODO: Reevaluate the mechanism for executing this task as performance can suffer here and only a specific key needs to be validated + + // Read in from the secret keyring file + try (FileInputStream keyInputStream = new FileInputStream(secretKeyringFile)) { + + // Form the SecretKeyRing collection (1.53 way with fingerprint calculator) + PGPSecretKeyRingCollection pgpSecretKeyRingCollection = new PGPSecretKeyRingCollection(keyInputStream, new BcKeyFingerprintCalculator()); + + // The decryptor is identical for all keys + final PBESecretKeyDecryptor decryptor = new JcePBESecretKeyDecryptorBuilder().setProvider(provider).build(passphrase); + + // Iterate over all secret keyrings + Iterator keyringIterator = pgpSecretKeyRingCollection.getKeyRings(); + PGPSecretKeyRing keyRing; + PGPSecretKey secretKey; + + while (keyringIterator.hasNext()) { + keyRing = keyringIterator.next(); + + // If keyId exists, get a specific secret key; else, iterate over all + if (keyId != 0) { + secretKey = keyRing.getSecretKey(keyId); + try { + return secretKey.extractPrivateKey(decryptor); + } catch (Exception e) { + throw new PGPException("No private key available using passphrase", e); + } + } else { + Iterator keyIterator = keyRing.getSecretKeys(); + + while (keyIterator.hasNext()) { + secretKey = keyIterator.next(); + try { + return secretKey.extractPrivateKey(decryptor); + } catch (Exception e) { + // TODO: Log (expected) failures? + } + } + } + } + } + + // If this point is reached, no private key could be extracted with the given passphrase + throw new PGPException("No private key available using passphrase"); } /* * Get the public key for a specific user id from a keyring. */ @SuppressWarnings("rawtypes") - public static PGPPublicKey getPublicKey(String userId, String publicKeyring) throws IOException, PGPException { - PGPPublicKey pubkey = null; - try (InputStream fin = Files.newInputStream(Paths.get(publicKeyring)); InputStream pin = PGPUtil.getDecoderStream(fin)) { - PGPPublicKeyRingCollection pgppub = new PGPPublicKeyRingCollection(pin); + public static PGPPublicKey getPublicKey(String userId, String publicKeyringFile) throws IOException, PGPException { + // TODO: Reevaluate the mechanism for executing this task as performance can suffer here and only a specific key needs to be validated - Iterator ringit = pgppub.getKeyRings(); - while (ringit.hasNext()) { - PGPPublicKeyRing kring = (PGPPublicKeyRing) ringit.next(); + // Read in from the public keyring file + try (FileInputStream keyInputStream = new FileInputStream(publicKeyringFile)) { - Iterator keyit = kring.getPublicKeys(); - while (keyit.hasNext()) { - pubkey = (PGPPublicKey) keyit.next(); - boolean userIdMatch = false; + // Form the PublicKeyRing collection (1.53 way with fingerprint calculator) + PGPPublicKeyRingCollection pgpPublicKeyRingCollection = new PGPPublicKeyRingCollection(keyInputStream, new BcKeyFingerprintCalculator()); - Iterator userit = pubkey.getUserIDs(); - while (userit.hasNext()) { - String id = userit.next().toString(); - if (id.contains(userId)) { - userIdMatch = true; - break; + // Iterate over all public keyrings + Iterator iter = pgpPublicKeyRingCollection.getKeyRings(); + PGPPublicKeyRing keyRing; + while (iter.hasNext()) { + keyRing = iter.next(); + + // Iterate over each public key in this keyring + Iterator keyIter = keyRing.getPublicKeys(); + while (keyIter.hasNext()) { + PGPPublicKey publicKey = keyIter.next(); + + // Iterate over each userId attached to the public key + Iterator userIdIterator = publicKey.getUserIDs(); + while (userIdIterator.hasNext()) { + String id = (String) userIdIterator.next(); + if (userId.equalsIgnoreCase(id)) { + return publicKey; } } - if (pubkey.isEncryptionKey() && userIdMatch) { - return pubkey; - } } } } - return null; + + // If this point is reached, no public key could be extracted with the given userId + throw new PGPException("Could not find a public key with the given userId"); } private static class OpenPGPDecryptCallback implements StreamCallback { private String provider; - private String secretKeyring; + private String secretKeyringFile; private char[] passphrase; - OpenPGPDecryptCallback(final String provider, final String keyring, final char[] passphrase) { + OpenPGPDecryptCallback(final String provider, final String secretKeyringFile, final char[] passphrase) { this.provider = provider; - this.secretKeyring = keyring; + this.secretKeyringFile = secretKeyringFile; this.passphrase = passphrase; } @Override public void process(InputStream in, OutputStream out) throws IOException { try (InputStream pgpin = PGPUtil.getDecoderStream(in)) { - PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin); + PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin, new BcKeyFingerprintCalculator()); Object obj = pgpFactory.nextObject(); if (!(obj instanceof PGPEncryptedDataList)) { @@ -160,19 +235,11 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { } PGPEncryptedDataList encList = (PGPEncryptedDataList) obj; - PGPSecretKeyRingCollection pgpSecretKeyring; - try (InputStream secretKeyringIS = Files.newInputStream(Paths.get(secretKeyring)); InputStream pgpIS = PGPUtil.getDecoderStream(secretKeyringIS)) { - // open secret keyring file - pgpSecretKeyring = new PGPSecretKeyRingCollection(pgpIS); - } catch (Exception e) { - throw new ProcessException("Invalid secret keyring - " + e.getMessage()); - } - try { PGPPrivateKey privateKey = null; PGPPublicKeyEncryptedData encData = null; - // find the secret key in the encrypted data + // Find the secret key in the encrypted data Iterator it = encList.getEncryptedDataObjects(); while (privateKey == null && it.hasNext()) { obj = it.next(); @@ -180,32 +247,60 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { throw new ProcessException("Invalid OpenPGP data"); } encData = (PGPPublicKeyEncryptedData) obj; - PGPSecretKey secretkey = pgpSecretKeyring.getSecretKey(encData.getKeyID()); - if (secretkey != null) { - privateKey = secretkey.extractPrivateKey(passphrase, provider); + + // Check each encrypted data object to see if it contains the key ID for the secret key -> private key + try { + privateKey = getDecryptedPrivateKey(provider, secretKeyringFile, encData.getKeyID(), passphrase); + } catch (PGPException e) { + // TODO: Log (expected) exception? } } if (privateKey == null) { throw new ProcessException("Secret keyring does not contain the key required to decrypt"); } - try (InputStream clearData = encData.getDataStream(privateKey, provider)) { - PGPObjectFactory clearFactory = new PGPObjectFactory(clearData); + // Read in the encrypted data stream and decrypt it + final PublicKeyDataDecryptorFactory dataDecryptor = new JcePublicKeyDataDecryptorFactoryBuilder().setProvider(provider).build(privateKey); + try (InputStream clear = encData.getDataStream(dataDecryptor)) { + // Create a plain object factory + JcaPGPObjectFactory plainFact = new JcaPGPObjectFactory(clear); - obj = clearFactory.nextObject(); - if (obj instanceof PGPCompressedData) { - PGPCompressedData compData = (PGPCompressedData) obj; - clearFactory = new PGPObjectFactory(compData.getDataStream()); - obj = clearFactory.nextObject(); + Object message = plainFact.nextObject(); + + // Check the message type and act accordingly + + // If compressed, decompress + if (message instanceof PGPCompressedData) { + PGPCompressedData cData = (PGPCompressedData) message; + JcaPGPObjectFactory pgpFact = new JcaPGPObjectFactory(cData.getDataStream()); + + message = pgpFact.nextObject(); } - PGPLiteralData literal = (PGPLiteralData) obj; - try (InputStream lis = literal.getInputStream()) { - final byte[] buffer = new byte[4096]; - int len; - while ((len = lis.read(buffer)) >= 0) { - out.write(buffer, 0, len); + // If the message is literal data, read it and process to the out stream + if (message instanceof PGPLiteralData) { + PGPLiteralData literalData = (PGPLiteralData) message; + + try (InputStream lis = literalData.getInputStream()) { + final byte[] buffer = new byte[BLOCK_SIZE]; + int len; + while ((len = lis.read(buffer)) >= 0) { + out.write(buffer, 0, len); + } } + } else if (message instanceof PGPOnePassSignatureList) { + // TODO: This is legacy code but should verify signature list here + throw new PGPException("encrypted message contains a signed message - not literal data."); + } else { + throw new PGPException("message is not a simple encrypted file - type unknown."); + } + + if (encData.isIntegrityProtected()) { + if (!encData.verify()) { + throw new PGPException("Failed message integrity check"); + } + } else { + logger.warn("No message integrity check"); } } } catch (Exception e) { @@ -235,6 +330,8 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { @Override public void process(InputStream in, OutputStream out) throws IOException { PGPPublicKey publicKey; + final boolean isArmored = EncryptContent.isPGPArmoredAlgorithm(algorithm); + try { publicKey = getPublicKey(userId, publicKeyring); } catch (Exception e) { @@ -242,35 +339,35 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { } try { - SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); - OutputStream output = out; - if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) { + if (isArmored) { output = new ArmoredOutputStream(out); } try { - PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false, secureRandom, provider); - encGenerator.addMethod(publicKey); - try (OutputStream encOut = encGenerator.open(output, new byte[65536])) { + // TODO: Refactor internal symmetric encryption algorithm to be customizable + PGPEncryptedDataGenerator encryptedDataGenerator = new PGPEncryptedDataGenerator( + new JcePGPDataEncryptorBuilder(PGPEncryptedData.AES_128).setWithIntegrityPacket(true).setSecureRandom(new SecureRandom()).setProvider(provider)); - PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED); - try (OutputStream compOut = compData.open(encOut, new byte[65536])) { + encryptedDataGenerator.addMethod(new JcePublicKeyKeyEncryptionMethodGenerator(publicKey).setProvider(provider)); - PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator(); - try (OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536])) { + // TODO: Refactor shared encryption code to utility + try (OutputStream encryptedOut = encryptedDataGenerator.open(output, new byte[BUFFER_SIZE])) { + PGPCompressedDataGenerator compressedDataGenerator = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED); + try (OutputStream compressedOut = compressedDataGenerator.open(encryptedOut, new byte[BUFFER_SIZE])) { + PGPLiteralDataGenerator literalDataGenerator = new PGPLiteralDataGenerator(); + try (OutputStream literalOut = literalDataGenerator.open(compressedOut, PGPLiteralData.BINARY, filename, new Date(), new byte[BUFFER_SIZE])) { - final byte[] buffer = new byte[4096]; + final byte[] buffer = new byte[BLOCK_SIZE]; int len; while ((len = in.read(buffer)) >= 0) { literalOut.write(buffer, 0, len); } - } } } } finally { - if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) { + if (isArmored) { output.close(); } } @@ -278,6 +375,5 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { throw new ProcessException(e.getMessage()); } } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java index e0c398c069..1ffe9e4659 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java @@ -16,38 +16,39 @@ */ package org.apache.nifi.processors.standard.util; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.standard.EncryptContent.Encryptor; +import org.bouncycastle.openpgp.PGPCompressedData; +import org.bouncycastle.openpgp.PGPEncryptedData; +import org.bouncycastle.openpgp.PGPEncryptedDataList; +import org.bouncycastle.openpgp.PGPException; +import org.bouncycastle.openpgp.PGPLiteralData; +import org.bouncycastle.openpgp.PGPPBEEncryptedData; +import org.bouncycastle.openpgp.jcajce.JcaPGPObjectFactory; +import org.bouncycastle.openpgp.operator.PBEDataDecryptorFactory; +import org.bouncycastle.openpgp.operator.PGPDigestCalculatorProvider; +import org.bouncycastle.openpgp.operator.PGPKeyEncryptionMethodGenerator; +import org.bouncycastle.openpgp.operator.jcajce.JcaPGPDigestCalculatorProviderBuilder; +import org.bouncycastle.openpgp.operator.jcajce.JcePBEDataDecryptorFactoryBuilder; +import org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.security.SecureRandom; -import java.util.Date; -import java.util.zip.Deflater; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processors.standard.EncryptContent; -import org.apache.nifi.processors.standard.EncryptContent.Encryptor; -import org.bouncycastle.bcpg.ArmoredOutputStream; -import org.bouncycastle.openpgp.PGPCompressedData; -import org.bouncycastle.openpgp.PGPCompressedDataGenerator; -import org.bouncycastle.openpgp.PGPEncryptedData; -import org.bouncycastle.openpgp.PGPEncryptedDataGenerator; -import org.bouncycastle.openpgp.PGPEncryptedDataList; -import org.bouncycastle.openpgp.PGPLiteralData; -import org.bouncycastle.openpgp.PGPLiteralDataGenerator; -import org.bouncycastle.openpgp.PGPObjectFactory; -import org.bouncycastle.openpgp.PGPPBEEncryptedData; -import org.bouncycastle.openpgp.PGPUtil; +import static org.bouncycastle.openpgp.PGPUtil.getDecoderStream; public class OpenPGPPasswordBasedEncryptor implements Encryptor { + private static final Logger logger = LoggerFactory.getLogger(OpenPGPPasswordBasedEncryptor.class); private String algorithm; private String provider; private char[] password; private String filename; - public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; - public OpenPGPPasswordBasedEncryptor(final String algorithm, final String provider, final char[] passphrase, final String filename) { this.algorithm = algorithm; this.provider = provider; @@ -77,8 +78,8 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor { @Override public void process(InputStream in, OutputStream out) throws IOException { - InputStream pgpin = PGPUtil.getDecoderStream(in); - PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin); + InputStream pgpin = getDecoderStream(in); + JcaPGPObjectFactory pgpFactory = new JcaPGPObjectFactory(pgpin); Object obj = pgpFactory.nextObject(); if (!(obj instanceof PGPEncryptedDataList)) { @@ -93,31 +94,41 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor { if (!(obj instanceof PGPPBEEncryptedData)) { throw new ProcessException("Invalid OpenPGP data"); } - PGPPBEEncryptedData encData = (PGPPBEEncryptedData) obj; + PGPPBEEncryptedData encryptedData = (PGPPBEEncryptedData) obj; try { - InputStream clearData = encData.getDataStream(password, provider); - PGPObjectFactory clearFactory = new PGPObjectFactory(clearData); + final PGPDigestCalculatorProvider digestCalculatorProvider = new JcaPGPDigestCalculatorProviderBuilder().setProvider(provider).build(); + final PBEDataDecryptorFactory decryptorFactory = new JcePBEDataDecryptorFactoryBuilder(digestCalculatorProvider).setProvider(provider).build(password); + InputStream clear = encryptedData.getDataStream(decryptorFactory); - obj = clearFactory.nextObject(); + JcaPGPObjectFactory pgpObjectFactory = new JcaPGPObjectFactory(clear); + + obj = pgpObjectFactory.nextObject(); if (obj instanceof PGPCompressedData) { - PGPCompressedData compData = (PGPCompressedData) obj; - clearFactory = new PGPObjectFactory(compData.getDataStream()); - obj = clearFactory.nextObject(); + PGPCompressedData compressedData = (PGPCompressedData) obj; + pgpObjectFactory = new JcaPGPObjectFactory(compressedData.getDataStream()); + obj = pgpObjectFactory.nextObject(); } - PGPLiteralData literal = (PGPLiteralData) obj; - InputStream lis = literal.getInputStream(); - final byte[] buffer = new byte[4096]; + PGPLiteralData literalData = (PGPLiteralData) obj; + InputStream plainIn = literalData.getInputStream(); + final byte[] buffer = new byte[org.apache.nifi.processors.standard.util.PGPUtil.BLOCK_SIZE]; int len; - while ((len = lis.read(buffer)) >= 0) { + while ((len = plainIn.read(buffer)) >= 0) { out.write(buffer, 0, len); } + + if (encryptedData.isIntegrityProtected()) { + if (!encryptedData.verify()) { + throw new PGPException("Integrity check failed"); + } + } else { + logger.warn("No message integrity check"); + } } catch (Exception e) { throw new ProcessException(e.getMessage()); } } - } private static class OpenPGPEncryptCallback implements StreamCallback { @@ -137,39 +148,11 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor { @Override public void process(InputStream in, OutputStream out) throws IOException { try { - SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); - - OutputStream output = out; - if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) { - output = new ArmoredOutputStream(out); - } - - PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false, - secureRandom, provider); - encGenerator.addMethod(password); - OutputStream encOut = encGenerator.open(output, new byte[65536]); - - PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED); - OutputStream compOut = compData.open(encOut, new byte[65536]); - - PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator(); - OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]); - - final byte[] buffer = new byte[4096]; - int len; - while ((len = in.read(buffer)) >= 0) { - literalOut.write(buffer, 0, len); - } - - literalOut.close(); - compOut.close(); - encOut.close(); - output.close(); + PGPKeyEncryptionMethodGenerator encryptionMethodGenerator = new JcePBEKeyEncryptionMethodGenerator(password).setProvider(provider); + org.apache.nifi.processors.standard.util.PGPUtil.encrypt(in, out, algorithm, provider, PGPEncryptedData.AES_128, filename, encryptionMethodGenerator); } catch (Exception e) { throw new ProcessException(e.getMessage()); } - } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java new file mode 100644 index 0000000000..b1ee11f650 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.apache.nifi.processors.standard.EncryptContent; +import org.bouncycastle.bcpg.ArmoredOutputStream; +import org.bouncycastle.openpgp.PGPCompressedData; +import org.bouncycastle.openpgp.PGPCompressedDataGenerator; +import org.bouncycastle.openpgp.PGPEncryptedData; +import org.bouncycastle.openpgp.PGPEncryptedDataGenerator; +import org.bouncycastle.openpgp.PGPException; +import org.bouncycastle.openpgp.PGPLiteralData; +import org.bouncycastle.openpgp.PGPLiteralDataGenerator; +import org.bouncycastle.openpgp.operator.PGPKeyEncryptionMethodGenerator; +import org.bouncycastle.openpgp.operator.jcajce.JcePGPDataEncryptorBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.SecureRandom; +import java.util.Date; +import java.util.zip.Deflater; + +/** + * This class contains static utility methods to assist with common PGP operations. + */ +public class PGPUtil { + private static final Logger logger = LoggerFactory.getLogger(PGPUtil.class); + + public static final int BUFFER_SIZE = 65536; + public static final int BLOCK_SIZE = 4096; + + public static void encrypt(InputStream in, OutputStream out, String algorithm, String provider, int cipher, String filename, PGPKeyEncryptionMethodGenerator encryptionMethodGenerator) throws IOException, PGPException { + final boolean isArmored = EncryptContent.isPGPArmoredAlgorithm(algorithm); + OutputStream output = out; + if (isArmored) { + output = new ArmoredOutputStream(out); + } + + // Default value, do not allow null encryption + if (cipher == PGPEncryptedData.NULL) { + logger.warn("Null encryption not allowed; defaulting to AES-128"); + cipher = PGPEncryptedData.AES_128; + } + + try { + // TODO: Can probably hardcode provider to BC and remove one method parameter + PGPEncryptedDataGenerator encryptedDataGenerator = new PGPEncryptedDataGenerator( + new JcePGPDataEncryptorBuilder(cipher).setWithIntegrityPacket(true).setSecureRandom(new SecureRandom()).setProvider(provider)); + + encryptedDataGenerator.addMethod(encryptionMethodGenerator); + + try (OutputStream encryptedOut = encryptedDataGenerator.open(output, new byte[BUFFER_SIZE])) { + PGPCompressedDataGenerator compressedDataGenerator = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED); + try (OutputStream compressedOut = compressedDataGenerator.open(encryptedOut, new byte[BUFFER_SIZE])) { + PGPLiteralDataGenerator literalDataGenerator = new PGPLiteralDataGenerator(); + try (OutputStream literalOut = literalDataGenerator.open(compressedOut, PGPLiteralData.BINARY, filename, new Date(), new byte[BUFFER_SIZE])) { + + final byte[] buffer = new byte[BLOCK_SIZE]; + int len; + while ((len = in.read(buffer)) >= 0) { + literalOut.write(buffer, 0, len); + } + } + } + } + } finally { + if (isArmored) { + output.close(); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java index ee21a50c8e..3be29760a5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.security.Security; -import java.util.Collection; - import org.apache.commons.codec.binary.Hex; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processors.standard.util.PasswordBasedEncryptor; @@ -39,6 +33,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.security.Security; +import java.util.Collection; + public class TestEncryptContent { private static final Logger logger = LoggerFactory.getLogger(TestEncryptContent.class); @@ -203,6 +203,128 @@ public class TestEncryptContent { flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt")); } + @Test + public void testShouldValidatePGPPublicKeyringRequiresUserId() { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection results; + MockProcessContext pc; + + runner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/pubring.gpg"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + + // Act + results = pc.validate(); + + // Assert + Assert.assertEquals(1, results.size()); + ValidationResult vr = (ValidationResult) results.toArray()[0]; + String expectedResult = " encryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " + + EncryptContent.PUBLIC_KEYRING.getDisplayName() + " and " + + EncryptContent.PUBLIC_KEY_USERID.getDisplayName(); + String message = "'" + vr.toString() + "' contains '" + expectedResult + "'"; + Assert.assertTrue(message, vr.toString().contains(expectedResult)); + } + + @Test + public void testShouldValidatePGPPublicKeyringExists() { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection results; + MockProcessContext pc; + + runner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/pubring.gpg.missing"); + runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + + // Act + results = pc.validate(); + + // Assert + Assert.assertEquals(1, results.size()); + ValidationResult vr = (ValidationResult) results.toArray()[0]; + String expectedResult = " (No such file or directory)"; + String message = "'" + vr.toString() + "' contains '" + expectedResult + "'"; + Assert.assertTrue(message, vr.toString().contains(expectedResult)); + } + + @Test + public void testShouldValidatePGPPublicKeyringIsProperFormat() { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection results; + MockProcessContext pc; + + runner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); + runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + + // Act + results = pc.validate(); + + // Assert + Assert.assertEquals(1, results.size()); + ValidationResult vr = (ValidationResult) results.toArray()[0]; + String expectedResult = " java.io.IOException: invalid header encountered"; + String message = "'" + vr.toString() + "' contains '" + expectedResult + "'"; + Assert.assertTrue(message, vr.toString().contains(expectedResult)); + } + + @Test + public void testShouldValidatePGPPublicKeyringContainsUserId() { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection results; + MockProcessContext pc; + + runner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/pubring.gpg"); + runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + + // Act + results = pc.validate(); + + // Assert + Assert.assertEquals(1, results.size()); + ValidationResult vr = (ValidationResult) results.toArray()[0]; + String expectedResult = "PGPException: Could not find a public key with the given userId"; + String message = "'" + vr.toString() + "' contains '" + expectedResult + "'"; + Assert.assertTrue(message, vr.toString().contains(expectedResult)); + } + + @Test + public void testShouldExtractPGPPublicKeyFromKeyring() { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection results; + MockProcessContext pc; + + runner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/pubring.gpg"); + runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "NiFi PGP Test Key (Short test key for NiFi PGP unit tests) "); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + + // Act + results = pc.validate(); + + // Assert + Assert.assertEquals(0, results.size()); + } + @Test public void testValidation() { final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); @@ -249,20 +371,15 @@ public class TestEncryptContent { + EncryptContent.PUBLIC_KEY_USERID.getDisplayName())); } - runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); - runner.enqueue(new byte[0]); - pc = (MockProcessContext) runner.getProcessContext(); - results = pc.validate(); - Assert.assertEquals(1, results.size()); - for (final ValidationResult vr : results) { - Assert.assertTrue(vr.toString().contains("does not contain user id USERID")); - } + // Legacy tests moved to individual tests to comply with new library + + // TODO: Move secring tests out to individual as well runner.removeProperty(EncryptContent.PUBLIC_KEYRING); runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID); runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); + runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/secring.gpg"); runner.enqueue(new byte[0]); pc = (MockProcessContext) runner.getProcessContext(); results = pc.validate(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java new file mode 100644 index 0000000000..fc7a2f6d4e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.apache.commons.codec.binary.Hex; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.security.util.EncryptionMethod; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; + +public class OpenPGPKeyBasedEncryptorTest { + private static final Logger logger = LoggerFactory.getLogger(OpenPGPKeyBasedEncryptorTest.class); + + private final File plainFile = new File("src/test/resources/TestEncryptContent/text.txt"); + private final File unsignedFile = new File("src/test/resources/TestEncryptContent/text.txt.unsigned.gpg"); + private final File encryptedFile = new File("src/test/resources/TestEncryptContent/text.txt.gpg"); + + private static final String SECRET_KEYRING_PATH = "src/test/resources/TestEncryptContent/secring.gpg"; + private static final String PUBLIC_KEYRING_PATH = "src/test/resources/TestEncryptContent/pubring.gpg"; + private static final String USER_ID = "NiFi PGP Test Key (Short test key for NiFi PGP unit tests) "; + + private static final String PASSWORD = "thisIsABadPassword"; + + @BeforeClass + public static void setUpOnce() throws Exception { + Security.addProvider(new BouncyCastleProvider()); + } + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testShouldEncryptAndDecrypt() throws Exception { + // Arrange + final String PLAINTEXT = "This is a plaintext message."; + logger.info("Plaintext: {}", PLAINTEXT); + InputStream plainStream = new ByteArrayInputStream(PLAINTEXT.getBytes("UTF-8")); + OutputStream cipherStream = new ByteArrayOutputStream(); + OutputStream recoveredStream = new ByteArrayOutputStream(); + + // No file, just streams + String filename = "tempFile.txt"; + + // Encryptor does not require password + OpenPGPKeyBasedEncryptor encryptor = new OpenPGPKeyBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), PUBLIC_KEYRING_PATH, USER_ID, new char[0], filename); + StreamCallback encryptionCallback = encryptor.getEncryptionCallback(); + + OpenPGPKeyBasedEncryptor decryptor = new OpenPGPKeyBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), SECRET_KEYRING_PATH, USER_ID, PASSWORD.toCharArray(), filename); + StreamCallback decryptionCallback = decryptor.getDecryptionCallback(); + + // Act + encryptionCallback.process(plainStream, cipherStream); + + final byte[] cipherBytes = ((ByteArrayOutputStream) cipherStream).toByteArray(); + logger.info("Encrypted: {}", Hex.encodeHexString(cipherBytes)); + InputStream cipherInputStream = new ByteArrayInputStream(cipherBytes); + + decryptionCallback.process(cipherInputStream, recoveredStream); + + // Assert + byte[] recoveredBytes = ((ByteArrayOutputStream) recoveredStream).toByteArray(); + String recovered = new String(recoveredBytes, "UTF-8"); + logger.info("Recovered: {}", recovered); + assert PLAINTEXT.equals(recovered); + } + + @Test + public void testShouldDecryptExternalFile() throws Exception { + // Arrange + byte[] plainBytes = Files.readAllBytes(Paths.get(plainFile.getPath())); + final String PLAINTEXT = new String(plainBytes, "UTF-8"); + + InputStream cipherStream = new FileInputStream(unsignedFile); + OutputStream recoveredStream = new ByteArrayOutputStream(); + + // No file, just streams + String filename = unsignedFile.getName(); + + OpenPGPKeyBasedEncryptor encryptor = new OpenPGPKeyBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), SECRET_KEYRING_PATH, USER_ID, PASSWORD.toCharArray(), filename); + + StreamCallback decryptionCallback = encryptor.getDecryptionCallback(); + + // Act + decryptionCallback.process(cipherStream, recoveredStream); + + // Assert + byte[] recoveredBytes = ((ByteArrayOutputStream) recoveredStream).toByteArray(); + String recovered = new String(recoveredBytes, "UTF-8"); + logger.info("Recovered: {}", recovered); + Assert.assertEquals("Recovered text", PLAINTEXT, recovered); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptorTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptorTest.java new file mode 100644 index 0000000000..ddd10ba956 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptorTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.apache.commons.codec.binary.Hex; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.security.util.EncryptionMethod; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; + +public class OpenPGPPasswordBasedEncryptorTest { + private static final Logger logger = LoggerFactory.getLogger(OpenPGPPasswordBasedEncryptorTest.class); + + private final File plainFile = new File("src/test/resources/TestEncryptContent/text.txt"); + private final File encryptedFile = new File("src/test/resources/TestEncryptContent/text.txt.asc"); + + private static final String PASSWORD = "thisIsABadPassword"; + private static final String LEGACY_PASSWORD = "Hello, World!"; + + @BeforeClass + public static void setUpOnce() throws Exception { + Security.addProvider(new BouncyCastleProvider()); + } + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testShouldEncryptAndDecrypt() throws Exception { + // Arrange + final String PLAINTEXT = "This is a plaintext message."; + logger.info("Plaintext: {}", PLAINTEXT); + InputStream plainStream = new java.io.ByteArrayInputStream(PLAINTEXT.getBytes("UTF-8")); + OutputStream cipherStream = new ByteArrayOutputStream(); + OutputStream recoveredStream = new ByteArrayOutputStream(); + + // No file, just streams + String filename = "tempFile.txt"; + + OpenPGPPasswordBasedEncryptor encryptor = new OpenPGPPasswordBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), PASSWORD.toCharArray(), filename); + + StreamCallback encryptionCallback = encryptor.getEncryptionCallback(); + StreamCallback decryptionCallback = encryptor.getDecryptionCallback(); + + // Act + encryptionCallback.process(plainStream, cipherStream); + + final byte[] cipherBytes = ((ByteArrayOutputStream) cipherStream).toByteArray(); + logger.info("Encrypted: {}", Hex.encodeHexString(cipherBytes)); + InputStream cipherInputStream = new ByteArrayInputStream(cipherBytes); + + decryptionCallback.process(cipherInputStream, recoveredStream); + + // Assert + byte[] recoveredBytes = ((ByteArrayOutputStream) recoveredStream).toByteArray(); + String recovered = new String(recoveredBytes, "UTF-8"); + logger.info("Recovered: {}", recovered); + assert PLAINTEXT.equals(recovered); + } + + @Test + public void testShouldDecryptExternalFile() throws Exception { + // Arrange + byte[] plainBytes = Files.readAllBytes(Paths.get(plainFile.getPath())); + final String PLAINTEXT = new String(plainBytes, "UTF-8"); + + InputStream cipherStream = new FileInputStream(encryptedFile); + OutputStream recoveredStream = new ByteArrayOutputStream(); + + // No file, just streams + String filename = encryptedFile.getName(); + + OpenPGPPasswordBasedEncryptor encryptor = new OpenPGPPasswordBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), LEGACY_PASSWORD.toCharArray(), filename); + + StreamCallback decryptionCallback = encryptor.getDecryptionCallback(); + + // Act + decryptionCallback.process(cipherStream, recoveredStream); + + // Assert + byte[] recoveredBytes = ((ByteArrayOutputStream) recoveredStream).toByteArray(); + String recovered = new String(recoveredBytes, "UTF-8"); + logger.info("Recovered: {}", recovered); + Assert.assertEquals("Recovered text", PLAINTEXT, recovered); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/pubring.gpg b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/pubring.gpg new file mode 100644 index 0000000000000000000000000000000000000000..c32ef145f12777e7a501349ac5745083466ec71b GIT binary patch literal 1250 zcmV<81ReXC0SyFJdISUk2mqmMGphCKz2 zCZmFgGfy+eGXykI5VOU6WR|L=L~p{=)I8%lJ;@Xq;ecNB*f~!9BIz&Q%--TW0`i%0 zf?{a^@msOLNgEoFL*3#cIn3qxn7p0rR)BjBm3T>i&fCpP9FcgfkT-yMv0c@;(-Og zH5dKV%WAc4jR5FJ>WQ36t2}PJi`4(UIRW~~>jWldJVkCCKyRQ(U{Kk8@_Syzqocyg z!RNn1j!!0G5mZMMTzvo$0RRECTuy05X&_KXP#{!gb95j}WqBYdQ)q8;bRcwPb95kU zWqBZGZ*m|`X+~)vP)AT8b#7^NAarGObaN>nJYj5aaB^jHbZ;(UaA9L;Wh-=Lb96vw zZDDC_E@N+PK8XQ01QP)Y03iheR(b>k0viJb3ke7Z0|gZd2?z@X76JnS00JHX0vCV) z3JDP7hEG!&Fw-6p^9TP%8$fY( zuDDYSSIN3;_E*ajnr@abIm4c0uNj)wP_8Afq?T8}m5RWd)+5S|@!bHWx>)vf%7uAB ziJ=xM@Bpn--UePkM&`5_h7I1TYi9^PpNua!+A6=V~3I^`asyYNXU z*QELw!b$EVciccmv38L1TtT1A%U24Yn=g_#JKZX~><+)g4INfK9XtsEd|xo=D|6k9 z&f*#=H$Bh*z1DrO0ssTK0SyFJdISUk2ms4WQHhh={M*SX?!Hadr-U@*e{J|0(p(ZR z>TdTk=Z|#?XBzpEejD^;5d9#?pAp905Z=G(5iFakRH2r^NC)#M7=&`X!=El7y&X9t z4RoOb-0R6j)mrtEH0m*eTqQOW7hjri^m1KChK_07KGw(RLFUL`F2x+LbR<1tyUXqm z8v@?@u>P%0H?dxiOwX)bLK-1ovi1 z1ve1hOO6hzXK>G`%+fY_%$j?{k@o-*0RRDs0Urby0SW*K1p-!j1Ox&b3;+rV5aWhV zQyDPR9<(n9{vEIan|%%i!mYI|NkdC4(z z^?|g5WY`*B`;5>;*w?s3$-=O>H41A->V_Srnl;Z0W#cghFN926s^W2IrYIpvWMCNA z%W%}(Hc1KSoBr87#~ub!s)i2N03^Bel8zc8DjK)_qWgD*3nK=Atu(eLkQC}t;k^UL zN;(9HbkMl^e$bKzrWrAR^Na+hN38iD-Z?C{Z?)iS;dbn5w`nRCxHp6+hv~R2>&?dR zR8h^D!vO~@BM;-y>ob&f%V(8(2~=v>3ma?af}V^!16r?qa^O+X*uo8AbYM4Tis0h& M@vzc+8n6NY1N`tWdjJ3c literal 0 HcmV?d00001 diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/secring.gpg b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestEncryptContent/secring.gpg new file mode 100644 index 0000000000000000000000000000000000000000..26af32ab31d446d2d51150253ed43217d6119e4e GIT binary patch literal 2628 zcmV-K3cK}{1HJ@SdISUk2mqmMGphCKz2 zCZmFgGfy+eGXykI5VOU6WR|L=L~p{=)I8%lJ;@Xq;ecNB*f~!9BIz&Q%--TW0`i%0 zf?{a^@msOLNgEoFL*3#cIn3qxn7p0rR)BjBm3T>i&fCpP9FcgfkT-yMv0c@;(-Og zH5dKV%WAc4jR5FJ>WQ36t2}PJi`4(UIRW~~>jWldJVkCCKyRQ(U{Kk8@_Syzqocyg z!RNn1j!!0G5mZMMTzvo$0RRF10|Nq74w5te2E2*mUN6yT1akC3zyevY9dd$RX_9l8 zdV+K{2966&=wzgeqEz`*)+)0OLw69^wmBP+7QFjK~ zsSyhe#UKFcw7@eFX@`^ONdGUa`HgLh_>#AK_V0TbA4An$yEkC$CI<)4f|h$cFS-_r z;&0odV*UH2nxa03)Tio9;Ij)iE8{84>bqw)PW2G%?6ax(+Kd`uY;F|XEs2b0_Yfq| z6Rcl~T0%iW2B$54WBryJWx|8 zA_jG8Ke1Mj{ntv^zi}DYMDG#C)}~p&H2KCBB%gJ`x@j6E=(EjezUj!M|Cm-OZwK7f zJY^nqYpa+7$iu%jg)Ge~6y+8tTuYDMTdTk7BHnWc2M4zd_gTzniJiB;UqpQWJgJ$I zDZ=vN;sLinKWdF{_?^+B^j_wZH0W@ukEUoZt;vf7()z`b0;x9Y6Y7DaN0>b?f9s5d z6}QDx(K41t4Yr+V3PVLK>z(DB4y-dLjkuQ)tjwYx`R{F{8MZp!qS;4Y-JE(q9}mT< z2x9-a_nIz;WP9YSvt_AwHX*gsn#gTUy?k5Yu0M-vs?5Vra_t#@AmTU-D>uLh8|H$p z5CUf{Aj9s~@0w2((j2-(Ya#;OAX!(diBI9Uzux6@v|LVUMrj~WM^GSCWpi{OOJ#W= zC{t)}a&#bcWpi{OYh`&LW^ZyJPH9GIAW%n8Aa!nObRcwPb98elAUt7gZ*X#Db98Sm zVQ^t%Xk{yOWpi{uXKi6=Y%XJOZ9a(sHv|&_3IHJm0#T)=;h`ucVe&z?F)?o7N-B zjq%+8rMg)5bjpQ!L5ZOjD)K|1vz>6veQ*cM%&kW=Brop1R7iyV6qaOkbDz@;v?oUM zwm6IwuTA!Bt~JF9L>$cO#k*n-3Z>02pv{CcFra~kAbn&SoB1IX);JCEN*>;62CtP( zA{As4b~@!AqPy@(DA%O=7{W>JBzN3EMzMB~@?1fm%*$5_pPMg|H#^-byX+3X#0?!* zJ{>#>0eoLD=qq#GjLza3DK|aP0KL|IumS)8oddoER(b>k0SExgOi_uG+x*+fDek^a z)~AFtVrtyu+U^AiW(qBMo$+0^IA#Mb%pMk~Hcuf?Opw6Bl2aaP)FrM~04R-9FaG=t1Vl zUoOQQuXH3mVY|!j4;upB`>_74O*gS#k4(?3TtXTlU$hD=nxtY3qX6G9KCU~ z;Xz zb%nA^S;XV6a6RDQ`gxTYL?=GvM@7NbUn5e*I-rAQbc?{ocu;j~L80sbJnUyP7)H;6d67Ex) z*lB&8HE_K9ZL`a0w1M8(9y7R?7qc}H1l2W;L6+8Y5B2Etb+SKewl)O2P5Bmgs+j!{ z_%(5(fXh4g^Y0W804~yS!lFqXJV4@p+;1YpSl~n1wT(_gN<>)u&Z%}nUy9B_gusvU zTM?Ra9UUr@FOo&tCZDy2?dc(Mp!&focg__+!`9$08$My(Osy@?u79V6+fbWFMMe{= z$*;bv45ap3HoVVmy&O180YjSozlGNa(JX|LKRRyC`z|iSyHg{5|Bqm(?s6Wj2aXTT zw{5hyunm~NRr*j5-H+vugu@Z~m&|2*hBa5VZL24XS}{#MhQ?mt6g4DcmSF`2@N@5g z?JTt2Qp0}(eFLb;_@{#W6mhR9udxuSEEW78*#AEF?KjyfrcIpW@XM5e zGRZDQ5s-!u`fFbiy8VDy%5tYVZhGj=xs*jk0vikf3JDP7hEG!&Fw-8iF9-f4KeI*}^DmI2Jp%YAX;XlJ zh44#U&pBRD{hlm4_Vdi6xv*+`UO#!sF?02Sw1Z^W8eaR1&_vkRxJ1dqu(&k}Ye(va z9j2N!&kJSaF$OP$Ok1kracHI}AxUIl7}v{i)Z8{n3Fw>t**(V|22!eq4%YxAx%85b z8X_thxBa5~cZ3Tg27s+JwkMDj>Qdpo1IJ1_1c-FdxcYw3k_M(3F@N)n1g1x<`5)dn zEVpm9;A`P_>}t1ZDi*jmgeQmTxGn3=#_v>7&6&di2P`8G3vJTB#Q)COEqOGvOJ;WxjwQV~oJ0s1AU40(GJ{K(Sk0R`5_gv{$&%uYKb*a4DIM^r1&p#AtRrQOfkEs-6% z+KMLef1h?V&(;&+di5xb(CXcaEg_(i5SN*K>>gB<$&czjoE1TolQjzF8JtlP(u3y2 z-Yi}Udt~-GwrBUhh?TwBOv`L6Y^2*Ybj9%z1A4Li+ofoPx)3g5<2E5h+=os=7=wT` zQm)r}e@0y*FBOYf8>0W2D5~Z4gW`<}rIVbS1keQHR|3x`-dm)kH&`f89|R##e|q?v ziH2rzDksSdfa;W8z$WR0rnpYwdk0My2z>y&c%0Ep@XFN@wZjeMWyYpdKyviEK?q}( zUw<)o1ZNF~(RPw%(3_G%ynLq1Bow&cB#Vv|x`u(0b1ZUVf$KVh3OWAv96L;3LvP@U zFpqB(1}##rrJ^$>IT*QU$xQ(yc6+7sZFa##C(EU5wGzb7>P&3Y^$xZp`j%WIL&a$ldWFxq)rUlys z$9Uja%#dZ8mq$k0P3JHJ`3>jQmBq&Q1{%|BVAe~k&{AN`dDO%qk4PXXPyJ`ns^}Fm zM#~&B6R^4S;7;k2XyY=?$L6v_vRqG&N_Nv3=BflxBf-<@O@K5&L#&)4B5e9kp+SGR zkW`VbIfDITm)+F=sALSqbSr?4e6hH89czCY5yj7N(+srNFdDfn1}kXRe-r5u^$mG&iSyG>D{Kb=;2{Fo2smUe>~M(RXo< zsHtCm<8tErdL~4vZkM4nNw}6LEmj3O7F;k?^L5>kSvgGgI<0F(Z~Fb8#9C2KUw)tDEt5?Zw}O( z&cQTvY>zDPcWlaJb>xh`=zw0gk3%N5^!rz9lO48{i3ZG|4(Av1Q?i0*4gLxK=($+- z01~x1ZU8+}`5X|GKP>Aqj2loU$ys|A^v&l!)k0^s+I{N>IS;tX3$9C(W(wn$iG29~XI68{kJPI*~q!mT?C?{ac1 z01orhw}$~pEdLB-n7wk6a*HALZL;PLZ^Gu7+a7<)QP6fF zOR<=M+k*~!yYZcmquGwBd%^JG^+qGjlkoZ7i?L~USUfa6&sYn9+is-l`*Z$vz+-_~ zJ36kHYIbYkHj3{Uz#QNYzaqFEACvaa36bj|mZCF%escQkYLChi87cS3&g=lK6oPYk{rF@+8WFNR<$(YEsl(B9oxd`>`< zCfjc!*^Z}1=h7Y|f&$F3g!WiO)H5!LnU+uYiT?tFJaEdZ4y_LGZ5t?4*t38gmYE!` zL{BpGlTYQTCS~fCevOvfYbes`0gX2;Q1l%6B$Oeyu;sZA0vK_vJJ3*+=6TE#Ag1(lMmA{Xd;Ui$ai>h<{$lyR4h=bqjH`gLcPskj|(BTBu9< zy*xrSQHS%>*5gKR-;+B_$fIK&l|6yyOslK;`i7KMuAHSf4m;I=D+qlFXF$36p~TMJ zF@u#lEMrzCsz4UciEC}A1lxn6#TINyis3PhD7r2R?yci3vBJBspo(v~EY^H@hZZ4r zejDl9>KLsNG8tk>t^^cNF z>JU7lWlQEFSdck!nidTTkfz-(gwPio()|BcYdxT3syaFtJUH@&6+3dLx7df!7h8ug{9!7qv+z!`4*({{PNJ5oT1$F1dvriZqMQ#3# C&YOw= literal 0 HcmV?d00001 diff --git a/pom.xml b/pom.xml index 2af004d6d3..3e3aa5902d 100644 --- a/pom.xml +++ b/pom.xml @@ -241,13 +241,18 @@ language governing permissions and limitations under the License. --> org.bouncycastle - bcprov-jdk16 - 1.46 + bcprov-jdk15on + 1.53 org.bouncycastle - bcpg-jdk16 - 1.46 + bcpg-jdk15on + 1.53 + + + org.bouncycastle + bcpkix-jdk15on + 1.53 com.jcraft From 44ffddd8b9da43ca27de09bc70fbf3e694649c05 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Tue, 12 Jan 2016 09:42:31 -0500 Subject: [PATCH 04/10] NIFI-1324: - Addressing lengthy lines contrib-check issue. - This closes #162 --- .../standard/util/OpenPGPKeyBasedEncryptor.java | 3 ++- .../apache/nifi/processors/standard/util/PGPUtil.java | 4 +++- .../standard/util/OpenPGPKeyBasedEncryptorTest.java | 9 ++++++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java index b97e416101..0364f1c79b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java @@ -97,7 +97,8 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { /** * Returns true if the passphrase is valid. *

- * This is used in the EncryptContent custom validation to check if the passphrase can extract a private key from the secret key ring. After BC was upgraded from 1.46 to 1.53, the API changed so this is performed differently but the functionality is equivalent. + * This is used in the EncryptContent custom validation to check if the passphrase can extract a private key from the secret key ring. After BC was upgraded from 1.46 to 1.53, the API changed + * so this is performed differently but the functionality is equivalent. * * @param provider the provider name * @param secretKeyringFile the file path to the keyring diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java index b1ee11f650..13190ff3c8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PGPUtil.java @@ -46,7 +46,9 @@ public class PGPUtil { public static final int BUFFER_SIZE = 65536; public static final int BLOCK_SIZE = 4096; - public static void encrypt(InputStream in, OutputStream out, String algorithm, String provider, int cipher, String filename, PGPKeyEncryptionMethodGenerator encryptionMethodGenerator) throws IOException, PGPException { + public static void encrypt(InputStream in, OutputStream out, String algorithm, String provider, int cipher, String filename, + PGPKeyEncryptionMethodGenerator encryptionMethodGenerator) throws IOException, PGPException { + final boolean isArmored = EncryptContent.isPGPArmoredAlgorithm(algorithm); OutputStream output = out; if (isArmored) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java index fc7a2f6d4e..01158477d3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptorTest.java @@ -79,10 +79,12 @@ public class OpenPGPKeyBasedEncryptorTest { String filename = "tempFile.txt"; // Encryptor does not require password - OpenPGPKeyBasedEncryptor encryptor = new OpenPGPKeyBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), PUBLIC_KEYRING_PATH, USER_ID, new char[0], filename); + OpenPGPKeyBasedEncryptor encryptor = new OpenPGPKeyBasedEncryptor( + EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), PUBLIC_KEYRING_PATH, USER_ID, new char[0], filename); StreamCallback encryptionCallback = encryptor.getEncryptionCallback(); - OpenPGPKeyBasedEncryptor decryptor = new OpenPGPKeyBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), SECRET_KEYRING_PATH, USER_ID, PASSWORD.toCharArray(), filename); + OpenPGPKeyBasedEncryptor decryptor = new OpenPGPKeyBasedEncryptor( + EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), SECRET_KEYRING_PATH, USER_ID, PASSWORD.toCharArray(), filename); StreamCallback decryptionCallback = decryptor.getDecryptionCallback(); // Act @@ -113,7 +115,8 @@ public class OpenPGPKeyBasedEncryptorTest { // No file, just streams String filename = unsignedFile.getName(); - OpenPGPKeyBasedEncryptor encryptor = new OpenPGPKeyBasedEncryptor(EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), SECRET_KEYRING_PATH, USER_ID, PASSWORD.toCharArray(), filename); + OpenPGPKeyBasedEncryptor encryptor = new OpenPGPKeyBasedEncryptor( + EncryptionMethod.PGP.getAlgorithm(), EncryptionMethod.PGP.getProvider(), SECRET_KEYRING_PATH, USER_ID, PASSWORD.toCharArray(), filename); StreamCallback decryptionCallback = encryptor.getDecryptionCallback(); From 287d3ef531a745ce5f0d59e0acb1ec4754d9d5ee Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 12 Jan 2016 10:58:27 -0500 Subject: [PATCH 05/10] This closes #166 From 133838a93fa5a5771e9812a14845003ad8782c96 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 12 Jan 2016 10:56:49 -0500 Subject: [PATCH 06/10] NIFI-1233 upgraded to Kafka 0.9.0.0 Signed-off-by: jpercivall --- .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 6 +++--- .../org/apache/nifi/processors/kafka/KafkaUtils.java | 4 +++- .../org/apache/nifi/processors/kafka/PutKafka.java | 5 ++--- .../apache/nifi/processors/kafka/TestPutKafka.java | 11 +++++++++++ 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index 2eee0506bd..fb59e693bc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -37,12 +37,12 @@ org.apache.kafka kafka-clients - 0.8.2.2 + 0.9.0.0 org.apache.kafka - kafka_2.9.1 - 0.8.2.2 + kafka_2.10 + 0.9.0.0 diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index 657d88b48e..d09ac4a1a7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; import kafka.api.TopicMetadata; import kafka.utils.ZKStringSerializer; +import kafka.utils.ZkUtils; import scala.collection.JavaConversions; /** @@ -38,6 +39,7 @@ class KafkaUtils { */ static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { ZkClient zkClient = new ZkClient(zookeeperConnectionString); + zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object o) throws ZkMarshallingError { @@ -50,7 +52,7 @@ class KafkaUtils { } }); scala.collection.Set topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false)); return topicMetadatas.size(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index b5766e4886..febb666d3a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,14 +20,15 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; -import scala.actors.threadpool.Arrays; - @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 17d1cc831a..e12ec2aeb4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -474,6 +475,16 @@ public class TestPutKafka { @Override public void close() { } + + @Override + public void close(long arg0, TimeUnit arg1) { + // ignore, not used in test + } + + @Override + public void flush() { + // ignore, not used in test + } } } From c757c4d6ead4b2a3f47d282fea3bd7f43de167c5 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Tue, 12 Jan 2016 19:16:25 -0500 Subject: [PATCH 07/10] NIFI-1385: Resolved test for missing file which was failing on Windows due to differing error message. Reviewed by Tony Kurc (tkurc@apache.org) --- .../org/apache/nifi/processors/standard/TestEncryptContent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java index 3be29760a5..08aa2d1e3f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java @@ -249,7 +249,7 @@ public class TestEncryptContent { // Assert Assert.assertEquals(1, results.size()); ValidationResult vr = (ValidationResult) results.toArray()[0]; - String expectedResult = " (No such file or directory)"; + String expectedResult = "java.io.FileNotFoundException"; String message = "'" + vr.toString() + "' contains '" + expectedResult + "'"; Assert.assertTrue(message, vr.toString().contains(expectedResult)); } From c3ac772b9230e3fc8053650142f0ad921fd98169 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Tue, 12 Jan 2016 19:37:25 -0500 Subject: [PATCH 08/10] NIFI-1324: bump version on bouncycastle artifacts from 1.53 to 1.54 Reviewed by Tony Kurc (tkurc@apache.org) This closes #170 --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 3e3aa5902d..95a86ec4aa 100644 --- a/pom.xml +++ b/pom.xml @@ -242,17 +242,17 @@ language governing permissions and limitations under the License. --> org.bouncycastle bcprov-jdk15on - 1.53 + 1.54 org.bouncycastle bcpg-jdk15on - 1.53 + 1.54 org.bouncycastle bcpkix-jdk15on - 1.53 + 1.54 com.jcraft From 92062f9beb2690dceec5876ab48ea2af0bc877b7 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 14 Jan 2016 00:42:36 -0500 Subject: [PATCH 09/10] NIFI-1317: removed duplicate 'name' instance variable Reviewed by Tony Kurc (tkurc@apache.org). This closes #169 --- .../nifi/controller/AbstractConfiguredComponent.java | 3 ++- .../apache/nifi/controller/StandardProcessorNode.java | 9 +-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 978c612fbb..df93cc0838 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -45,7 +45,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final ValidationContextFactory validationContextFactory; private final ControllerServiceProvider serviceProvider; - private final AtomicReference name = new AtomicReference<>(); + private final AtomicReference name; private final AtomicReference annotationData = new AtomicReference<>(); private final Lock lock = new ReentrantLock(); @@ -57,6 +57,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone this.component = component; this.validationContextFactory = validationContextFactory; this.serviceProvider = serviceProvider; + this.name = new AtomicReference<>(component.getClass().getSimpleName()); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 567c3c782c..aade16b343 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -93,7 +93,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final AtomicBoolean lossTolerant; private final AtomicReference scheduledState; private final AtomicReference comments; - private final AtomicReference name; private final AtomicReference position; private final AtomicReference annotationData; private final AtomicReference schedulingPeriod; // stored as string so it's presented to user as they entered it @@ -134,7 +133,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final Set emptySetOfRelationships = new HashSet<>(); undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); comments = new AtomicReference<>(""); - name = new AtomicReference<>(processor.getClass().getSimpleName()); schedulingPeriod = new AtomicReference<>("0 sec"); schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD); @@ -344,11 +342,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return Collections.unmodifiableSet(relationships); } - @Override - public String getName() { - return name.get(); - } - /** * @return the value of the processor's {@link CapabilityDescription} annotation, if one exists, else null. */ @@ -375,7 +368,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } - this.name.set(name); + super.setName(name); } finally { writeLock.unlock(); } From ecb81ec113e20f3850fd3f7080da2360035cabd8 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Thu, 14 Jan 2016 23:16:55 -0500 Subject: [PATCH 10/10] NIFI-1362 Set mime.type attribute on response FlowFile based on InvokeHTTP response Content-Type Signed-off-by: Aldrin Piri --- .../nifi/processors/standard/InvokeHTTP.java | 8 +++- .../standard/util/TestInvokeHttpCommon.java | 41 +++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 4e5c22d4b9..44f76e5bb7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -556,7 +556,7 @@ public final class InvokeHTTP extends AbstractProcessor { throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); } - // Create a map of the status attributes that are always written to the request and reponse FlowFiles + // Create a map of the status attributes that are always written to the request and response FlowFiles Map statusAttributes = new HashMap<>(); statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); statusAttributes.put(STATUS_MESSAGE, statusMessage); @@ -602,7 +602,7 @@ public final class InvokeHTTP extends AbstractProcessor { responseFlowFile = session.create(); } - // write the status attributes + // write attributes to response flowfile responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes); // write the response headers as attributes @@ -612,6 +612,10 @@ public final class InvokeHTTP extends AbstractProcessor { // transfer the message body to the payload // can potentially be null in edge cases if (bodyExists) { + // write content type attribute to response flowfile if it is available + if (responseBody.contentType() != null) { + responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString()); + } if (teeInputStream != null) { responseFlowFile = session.importFrom(teeInputStream, responseFlowFile); } else { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java index cf14e0854e..3464f145d2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java @@ -225,6 +225,47 @@ public abstract class TestInvokeHttpCommon { bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); } + @Test + public void testOutputResponseSetMimeTypeToResponseContentType() throws Exception { + addHandler(new GetOrHeadHandler()); + + String statusUrl = "/status/200"; + runner.setProperty(InvokeHTTP.PROP_URL, url + statusUrl); + runner.setProperty(InvokeHTTP.PROP_METHOD, "GET"); + runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true"); + runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("outputBody", statusUrl); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals(statusUrl.getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + bundle1.assertAttributeEquals("mime.type", "text/plain; charset=ISO-8859-1"); + } + @Test public void testOutputResponseRegardlessWithOutputInAttributeLarge() throws Exception { addHandler(new GetLargeHandler());