NIFI-13283 - Fix exception thrown in PutKinesisFirehose processor by ensuring value exists before referencing

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8867.
This commit is contained in:
Evan Shelton 2024-05-22 14:08:04 -04:00 committed by Pierre Villard
parent 969b332fb0
commit 99253b9673
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 1 additions and 1 deletions

View File

@ -130,9 +130,9 @@ public class PutKinesisFirehose extends AbstractAwsSyncProcessor<FirehoseClient,
for (final FlowFile flowFile : flowFiles) {
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>());
session.read(flowFile, in -> recordHash.get(firehoseStreamName).add(Record.builder().data(SdkBytes.fromInputStream(in)).build()));
recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>());
final List<FlowFile> flowFilesForStream = hashFlowFiles.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>());
flowFilesForStream.add(flowFile);
}