From 29d54126e4afac1e984912b1ccb0a5fb222c7928 Mon Sep 17 00:00:00 2001 From: Alex Goos Date: Wed, 30 Jan 2019 17:31:20 +0100 Subject: [PATCH] NIFI-5989 - PutKudu: Additional FF Queue length setting Signed-off-by: Mark Payne --- .../apache/nifi/processors/kudu/PutKudu.java | 20 +++++++++++++++++-- .../nifi/processors/kudu/TestPutKudu.java | 1 + 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 960531508f..1a1d9b8e41 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -128,9 +128,21 @@ public class PutKudu extends AbstractProcessor { .required(true) .build(); + protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("FlowFiles per Batch") + .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " + + "Depending on your memory size, and data size per row set an appropriate batch size " + + "for the number of FlowFiles to process per client connection setup." + + "Gradually increase this number, only if your FlowFiles typically contain a few records.") + .defaultValue("1") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 100000, true)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") - .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " + + .description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " + "Depending on your memory size, and data size per row set an appropriate batch size. " + "Gradually increase this number to find out the best one for best performances.") .defaultValue("100") @@ -139,6 +151,7 @@ public class PutKudu extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + protected static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu") @@ -153,6 +166,7 @@ public class PutKudu extends AbstractProcessor { protected OperationType operationType; protected SessionConfiguration.FlushMode flushMode; protected int batchSize = 100; + protected int ffbatch = 1; protected KuduClient kuduClient; protected KuduTable kuduTable; @@ -166,6 +180,7 @@ public class PutKudu extends AbstractProcessor { properties.add(RECORD_READER); properties.add(INSERT_OPERATION); properties.add(FLUSH_MODE); + properties.add(FLOWFILE_BATCH_SIZE); properties.add(BATCH_SIZE); return properties; @@ -186,6 +201,7 @@ public class PutKudu extends AbstractProcessor { final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); getLogger().debug("Setting up Kudu connection..."); @@ -209,7 +225,7 @@ public class PutKudu extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final List flowFiles = session.get(batchSize); + final List flowFiles = session.get(ffbatch); if (flowFiles.isEmpty()) { return; } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 041b506486..51908f227e 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -480,6 +480,7 @@ public class TestPutKudu { setUpTestRunner(testRunner); testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name()); testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize)); + testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, String.valueOf(batchSize)); IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue("")); testRunner.run(numFlowFiles);