NIFI-10873 - GenerateFlowFile: flowfiles in a batch are not unique

This closes #6717.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Arpad Boda 2022-11-24 16:26:31 +01:00 committed by Peter Turcsanyi
parent 5f1d93f977
commit a469c92f21
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
1 changed files with 13 additions and 9 deletions

View File

@ -219,13 +219,16 @@ public class GenerateFlowFile extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final byte[] data;
if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
data = generateData(context);
} else if(context.getProperty(CUSTOM_TEXT).isSet()) {
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
data = context.getProperty(CUSTOM_TEXT).evaluateAttributeExpressions().getValue().getBytes(charset);
final boolean uniqueData = context.getProperty(UNIQUE_FLOWFILES).asBoolean();
if (uniqueData) {
data = new byte[0];
} else {
data = this.data.get();
if (context.getProperty(CUSTOM_TEXT).isSet()) {
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
data = context.getProperty(CUSTOM_TEXT).evaluateAttributeExpressions().getValue().getBytes(charset);
} else {
data = this.data.get();
}
}
Map<PropertyDescriptor, String> processorProperties = context.getProperties();
@ -243,12 +246,13 @@ public class GenerateFlowFile extends AbstractProcessor {
}
for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
FlowFile flowFile = session.create();
if (data.length > 0) {
FlowFile flowFile = session.create();
final byte[] writtenData = uniqueData ? generateData(context) : data;
if (writtenData.length > 0) {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(data);
out.write(writtenData);
}
});
}