mirror of https://github.com/apache/nifi.git
NIFI-5989 - PutKudu: Additional FF Queue length setting
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
4e914cea1f
commit
29d54126e4
|
@ -128,9 +128,21 @@ public class PutKudu extends AbstractProcessor {
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("FlowFiles per Batch")
|
||||
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
|
||||
"Depending on your memory size, and data size per row set an appropriate batch size " +
|
||||
"for the number of FlowFiles to process per client connection setup." +
|
||||
"Gradually increase this number, only if your FlowFiles typically contain a few records.")
|
||||
.defaultValue("1")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Batch Size")
|
||||
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
|
||||
.description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " +
|
||||
"Depending on your memory size, and data size per row set an appropriate batch size. " +
|
||||
"Gradually increase this number to find out the best one for best performances.")
|
||||
.defaultValue("100")
|
||||
|
@ -139,6 +151,7 @@ public class PutKudu extends AbstractProcessor {
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
|
||||
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
|
||||
|
@ -153,6 +166,7 @@ public class PutKudu extends AbstractProcessor {
|
|||
protected OperationType operationType;
|
||||
protected SessionConfiguration.FlushMode flushMode;
|
||||
protected int batchSize = 100;
|
||||
protected int ffbatch = 1;
|
||||
|
||||
protected KuduClient kuduClient;
|
||||
protected KuduTable kuduTable;
|
||||
|
@ -166,6 +180,7 @@ public class PutKudu extends AbstractProcessor {
|
|||
properties.add(RECORD_READER);
|
||||
properties.add(INSERT_OPERATION);
|
||||
properties.add(FLUSH_MODE);
|
||||
properties.add(FLOWFILE_BATCH_SIZE);
|
||||
properties.add(BATCH_SIZE);
|
||||
|
||||
return properties;
|
||||
|
@ -186,6 +201,7 @@ public class PutKudu extends AbstractProcessor {
|
|||
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
|
||||
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
|
||||
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
|
||||
|
||||
getLogger().debug("Setting up Kudu connection...");
|
||||
|
@ -209,7 +225,7 @@ public class PutKudu extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final List<FlowFile> flowFiles = session.get(batchSize);
|
||||
final List<FlowFile> flowFiles = session.get(ffbatch);
|
||||
if (flowFiles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -480,6 +480,7 @@ public class TestPutKudu {
|
|||
setUpTestRunner(testRunner);
|
||||
testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name());
|
||||
testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize));
|
||||
testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, String.valueOf(batchSize));
|
||||
|
||||
IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue(""));
|
||||
testRunner.run(numFlowFiles);
|
||||
|
|
Loading…
Reference in New Issue