From 0c0c33411d586af8c0c195181747e9d55709a574 Mon Sep 17 00:00:00 2001 From: cam Date: Thu, 14 Sep 2017 15:29:08 -0700 Subject: [PATCH] NIFI-4384 - Enhance PutKudu processor to support batch insert Signed-off-by: Pierre Villard This closes #2160. --- .../nifi/processors/kudu/AbstractKudu.java | 43 +++++++++++++++++-- .../apache/nifi/processors/kudu/PutKudu.java | 4 +- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java index 5019e03bf4..359e8177fc 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -28,6 +28,7 @@ import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Insert; import org.apache.kudu.client.Upsert; +import org.apache.kudu.client.SessionConfiguration; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -60,6 +61,7 @@ public abstract class AbstractKudu extends AbstractProcessor { .description("List all kudu masters's ip with port (e.g. 7051), comma separated") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() @@ -67,6 +69,7 @@ public abstract class AbstractKudu extends AbstractProcessor { .description("The name of the Kudu Table to put data into") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -94,6 +97,29 @@ public abstract class AbstractKudu extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder() + .name("Flush Mode") + .description("Set the new flush mode for a kudu session.\n" + + "AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" + + "AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" + + " operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" + + "MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.") + .allowableValues(SessionConfiguration.FlushMode.values()) + .defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()) + .required(true) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Set the number of operations that can be buffered, between 2 - 100000. " + + "Depending on your memory size, and data size per row set an appropriate batch size. " + + "Gradually increase this number to find out the best one for best performances.") + .defaultValue("100") + .required(true) + .addValidator(StandardValidators.createLongValidator(2, 100000, true)) + .expressionLanguageSupported(true) + .build(); + protected static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu") @@ -109,6 +135,8 @@ public abstract class AbstractKudu extends AbstractProcessor { protected String tableName; protected boolean skipHeadLine; protected OperationType operationType; + protected SessionConfiguration.FlushMode flushMode; + protected int batchSize = 100; protected KuduClient kuduClient; protected KuduTable kuduTable; @@ -116,19 +144,22 @@ public abstract class AbstractKudu extends AbstractProcessor { @OnScheduled public void OnScheduled(final ProcessContext context) { try { - tableName = context.getProperty(TABLE_NAME).getValue(); - kuduMasters = context.getProperty(KUDU_MASTERS).getValue(); + tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); if(kuduClient == null) { getLogger().debug("Setting up Kudu connection..."); kuduClient = getKuduConnection(kuduMasters); kuduTable = this.getKuduTable(kuduClient, tableName); getLogger().debug("Kudu connection successfully initialized"); } + + operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); + batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); + skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean(); } catch(KuduException ex){ getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); } - operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); - skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean(); } @OnStopped @@ -223,6 +254,10 @@ public abstract class AbstractKudu extends AbstractProcessor { protected KuduSession getKuduSession(KuduClient client){ KuduSession kuduSession = client.newSession(); + + kuduSession.setMutationBufferSpace(batchSize); + kuduSession.setFlushMode(flushMode); + if(operationType == OperationType.INSERT_IGNORE){ kuduSession.setIgnoreAllDuplicateRows(true); } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 53fc678087..313e49b9d8 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -43,7 +43,7 @@ import java.util.Set; @EventDriven @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@Tags({"put", "database", "NoSQL", "kudu", "HDFS"}) +@Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"}) @CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " + "to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." + " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure") @@ -58,6 +58,8 @@ public class PutKudu extends AbstractKudu { properties.add(SKIP_HEAD_LINE); properties.add(RECORD_READER); properties.add(INSERT_OPERATION); + properties.add(FLUSH_MODE); + properties.add(BATCH_SIZE); return properties; }