diff --git a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java index a45d90d1f0..9b676fd830 100644 --- a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java +++ b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java @@ -29,12 +29,12 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.pgp.service.api.PGPPublicKeyService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.pgp.attributes.CompressionAlgorithm; @@ -57,9 +57,11 @@ import org.bouncycastle.openpgp.operator.bc.BcPGPDataEncryptorBuilder; import org.bouncycastle.openpgp.operator.bc.BcPublicKeyKeyEncryptionMethodGenerator; import org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PushbackInputStream; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; @@ -198,14 +200,10 @@ public class EncryptContentPGP extends AbstractProcessor { } try { - final PacketReadInputStreamCallback packetCallback = new PacketReadInputStreamCallback(); - session.read(flowFile, packetCallback); - final SymmetricKeyAlgorithm symmetricKeyAlgorithm = getSymmetricKeyAlgorithm(context); final FileEncoding fileEncoding = getFileEncoding(context); final CompressionAlgorithm compressionAlgorithm = getCompressionAlgorithm(context); - final StreamCallback callback = getEncryptStreamCallback(context, flowFile, symmetricKeyAlgorithm, - compressionAlgorithm, fileEncoding, packetCallback.packetFound); + final StreamCallback callback = getEncryptStreamCallback(context, flowFile, symmetricKeyAlgorithm, compressionAlgorithm, fileEncoding); flowFile = session.write(flowFile, callback); final Map attributes = getAttributes(symmetricKeyAlgorithm, fileEncoding, compressionAlgorithm); @@ -267,8 +265,7 @@ public class EncryptContentPGP extends AbstractProcessor { final FlowFile flowFile, final SymmetricKeyAlgorithm symmetricKeyAlgorithm, final CompressionAlgorithm compressionAlgorithm, - final FileEncoding fileEncoding, - final boolean packetFound) { + final FileEncoding fileEncoding) { final SecureRandom secureRandom = new SecureRandom(); final PGPDataEncryptorBuilder dataEncryptorBuilder = new BcPGPDataEncryptorBuilder(symmetricKeyAlgorithm.getId()) .setSecureRandom(secureRandom) @@ -278,7 +275,7 @@ public class EncryptContentPGP extends AbstractProcessor { methodGenerators.forEach(encryptedDataGenerator::addMethod); final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - return new EncryptStreamCallback(fileEncoding, compressionAlgorithm, filename, packetFound, encryptedDataGenerator); + return new EncryptStreamCallback(fileEncoding, compressionAlgorithm, filename, getLogger(), encryptedDataGenerator); } private List getEncryptionMethodGenerators(final ProcessContext context, @@ -338,43 +335,18 @@ public class EncryptContentPGP extends AbstractProcessor { return attributes; } - private class PacketReadInputStreamCallback implements InputStreamCallback { - private boolean packetFound; - - /** - * Process Input Stream and attempt to read OpenPGP Packet for content detection - * - * @param inputStream Input Stream to be read - */ - @Override - public void process(final InputStream inputStream) { - try { - final InputStream decodedInputStream = PGPUtil.getDecoderStream(inputStream); - final BCPGInputStream packetInputStream = new BCPGInputStream(decodedInputStream); - final Packet packet = packetInputStream.readPacket(); - if (packet == null) { - getLogger().debug("PGP Packet not found"); - } else { - packetFound = true; - } - } catch (final IOException e) { - getLogger().debug("PGP Packet read failed", e); - } - } - } - private static class EncryptStreamCallback extends EncodingStreamCallback { - private final boolean packetFound; - private final PGPEncryptedDataGenerator encryptedDataGenerator; + private final ComponentLog logger; + public EncryptStreamCallback(final FileEncoding fileEncoding, final CompressionAlgorithm compressionAlgorithm, final String filename, - final boolean packetFound, + final ComponentLog logger, final PGPEncryptedDataGenerator encryptedDataGenerator) { super(fileEncoding, compressionAlgorithm, filename); - this.packetFound = packetFound; + this.logger = logger; this.encryptedDataGenerator = encryptedDataGenerator; } @@ -388,15 +360,44 @@ public class EncryptContentPGP extends AbstractProcessor { */ @Override protected void processEncoding(final InputStream inputStream, final OutputStream encodingOutputStream) throws IOException, PGPException { - try (final OutputStream encryptedOutputStream = encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())) { - if (packetFound) { + try ( + final PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, OUTPUT_BUFFER_SIZE); + final OutputStream encryptedOutputStream = encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer()) + ) { + if (isPacketFound(pushbackInputStream)) { // Write OpenPGP packets to encrypted stream without additional encoding - StreamUtils.copy(inputStream, encryptedOutputStream); + StreamUtils.copy(pushbackInputStream, encryptedOutputStream); } else { - super.processEncoding(inputStream, encryptedOutputStream); + super.processEncoding(pushbackInputStream, encryptedOutputStream); } } encryptedDataGenerator.close(); } + + private boolean isPacketFound(final PushbackInputStream pushbackInputStream) throws IOException { + boolean packetFound = false; + + final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; + final int bytesRead = StreamUtils.fillBuffer(pushbackInputStream, buffer, false); + logger.debug("PGP Packet search read buffer bytes [{}]", bytesRead); + try ( + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(buffer); + final InputStream decodedInputStream = PGPUtil.getDecoderStream(byteArrayInputStream); + final BCPGInputStream packetInputStream = new BCPGInputStream(decodedInputStream) + ) { + final Packet packet = packetInputStream.readPacket(); + if (packet == null) { + logger.debug("PGP Packet not found"); + } else { + packetFound = true; + } + } catch (final Exception e) { + logger.debug("PGP Packet read failed", e); + } finally { + pushbackInputStream.unread(buffer, 0, bytesRead); + } + + return packetFound; + } } } diff --git a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java index 1377e4e289..68ef8e88cc 100644 --- a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java +++ b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java @@ -37,7 +37,7 @@ import java.util.Objects; * Encoding Stream Callback handles writing PGP messages using configured properties */ public class EncodingStreamCallback implements StreamCallback { - private static final int OUTPUT_BUFFER_SIZE = 8192; + protected static final int OUTPUT_BUFFER_SIZE = 8192; private final FileEncoding fileEncoding;