NIFI-11437 Switched to StreamUtils.fillBuffer() for buffer, Improved EncryptContentPGP Content Type Detection

This closes #7166
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2023-04-12 13:00:40 -05:00 committed by Paul Grey
parent 061f3a1380
commit bc5f00a667
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
2 changed files with 44 additions and 43 deletions

View File

@ -29,12 +29,12 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.pgp.service.api.PGPPublicKeyService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.pgp.attributes.CompressionAlgorithm;
@ -57,9 +57,11 @@ import org.bouncycastle.openpgp.operator.bc.BcPGPDataEncryptorBuilder;
import org.bouncycastle.openpgp.operator.bc.BcPublicKeyKeyEncryptionMethodGenerator;
import org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
@ -198,14 +200,10 @@ public class EncryptContentPGP extends AbstractProcessor {
}
try {
final PacketReadInputStreamCallback packetCallback = new PacketReadInputStreamCallback();
session.read(flowFile, packetCallback);
final SymmetricKeyAlgorithm symmetricKeyAlgorithm = getSymmetricKeyAlgorithm(context);
final FileEncoding fileEncoding = getFileEncoding(context);
final CompressionAlgorithm compressionAlgorithm = getCompressionAlgorithm(context);
final StreamCallback callback = getEncryptStreamCallback(context, flowFile, symmetricKeyAlgorithm,
compressionAlgorithm, fileEncoding, packetCallback.packetFound);
final StreamCallback callback = getEncryptStreamCallback(context, flowFile, symmetricKeyAlgorithm, compressionAlgorithm, fileEncoding);
flowFile = session.write(flowFile, callback);
final Map<String, String> attributes = getAttributes(symmetricKeyAlgorithm, fileEncoding, compressionAlgorithm);
@ -267,8 +265,7 @@ public class EncryptContentPGP extends AbstractProcessor {
final FlowFile flowFile,
final SymmetricKeyAlgorithm symmetricKeyAlgorithm,
final CompressionAlgorithm compressionAlgorithm,
final FileEncoding fileEncoding,
final boolean packetFound) {
final FileEncoding fileEncoding) {
final SecureRandom secureRandom = new SecureRandom();
final PGPDataEncryptorBuilder dataEncryptorBuilder = new BcPGPDataEncryptorBuilder(symmetricKeyAlgorithm.getId())
.setSecureRandom(secureRandom)
@ -278,7 +275,7 @@ public class EncryptContentPGP extends AbstractProcessor {
methodGenerators.forEach(encryptedDataGenerator::addMethod);
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
return new EncryptStreamCallback(fileEncoding, compressionAlgorithm, filename, packetFound, encryptedDataGenerator);
return new EncryptStreamCallback(fileEncoding, compressionAlgorithm, filename, getLogger(), encryptedDataGenerator);
}
private List<PGPKeyEncryptionMethodGenerator> getEncryptionMethodGenerators(final ProcessContext context,
@ -338,43 +335,18 @@ public class EncryptContentPGP extends AbstractProcessor {
return attributes;
}
private class PacketReadInputStreamCallback implements InputStreamCallback {
private boolean packetFound;
/**
* Process Input Stream and attempt to read OpenPGP Packet for content detection
*
* @param inputStream Input Stream to be read
*/
@Override
public void process(final InputStream inputStream) {
try {
final InputStream decodedInputStream = PGPUtil.getDecoderStream(inputStream);
final BCPGInputStream packetInputStream = new BCPGInputStream(decodedInputStream);
final Packet packet = packetInputStream.readPacket();
if (packet == null) {
getLogger().debug("PGP Packet not found");
} else {
packetFound = true;
}
} catch (final IOException e) {
getLogger().debug("PGP Packet read failed", e);
}
}
}
private static class EncryptStreamCallback extends EncodingStreamCallback {
private final boolean packetFound;
private final PGPEncryptedDataGenerator encryptedDataGenerator;
private final ComponentLog logger;
public EncryptStreamCallback(final FileEncoding fileEncoding,
final CompressionAlgorithm compressionAlgorithm,
final String filename,
final boolean packetFound,
final ComponentLog logger,
final PGPEncryptedDataGenerator encryptedDataGenerator) {
super(fileEncoding, compressionAlgorithm, filename);
this.packetFound = packetFound;
this.logger = logger;
this.encryptedDataGenerator = encryptedDataGenerator;
}
@ -388,15 +360,44 @@ public class EncryptContentPGP extends AbstractProcessor {
*/
@Override
protected void processEncoding(final InputStream inputStream, final OutputStream encodingOutputStream) throws IOException, PGPException {
try (final OutputStream encryptedOutputStream = encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())) {
if (packetFound) {
try (
final PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, OUTPUT_BUFFER_SIZE);
final OutputStream encryptedOutputStream = encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())
) {
if (isPacketFound(pushbackInputStream)) {
// Write OpenPGP packets to encrypted stream without additional encoding
StreamUtils.copy(inputStream, encryptedOutputStream);
StreamUtils.copy(pushbackInputStream, encryptedOutputStream);
} else {
super.processEncoding(inputStream, encryptedOutputStream);
super.processEncoding(pushbackInputStream, encryptedOutputStream);
}
}
encryptedDataGenerator.close();
}
private boolean isPacketFound(final PushbackInputStream pushbackInputStream) throws IOException {
boolean packetFound = false;
final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
final int bytesRead = StreamUtils.fillBuffer(pushbackInputStream, buffer, false);
logger.debug("PGP Packet search read buffer bytes [{}]", bytesRead);
try (
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(buffer);
final InputStream decodedInputStream = PGPUtil.getDecoderStream(byteArrayInputStream);
final BCPGInputStream packetInputStream = new BCPGInputStream(decodedInputStream)
) {
final Packet packet = packetInputStream.readPacket();
if (packet == null) {
logger.debug("PGP Packet not found");
} else {
packetFound = true;
}
} catch (final Exception e) {
logger.debug("PGP Packet read failed", e);
} finally {
pushbackInputStream.unread(buffer, 0, bytesRead);
}
return packetFound;
}
}
}

View File

@ -37,7 +37,7 @@ import java.util.Objects;
* Encoding Stream Callback handles writing PGP messages using configured properties
*/
public class EncodingStreamCallback implements StreamCallback {
private static final int OUTPUT_BUFFER_SIZE = 8192;
protected static final int OUTPUT_BUFFER_SIZE = 8192;
private final FileEncoding fileEncoding;