NIFI-4384 - Enhance PutKudu processor to support batch insert

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2160.
This commit is contained in:
cam 2017-09-14 15:29:08 -07:00 committed by Pierre Villard
parent 329dbe3a64
commit 0c0c33411d
2 changed files with 42 additions and 5 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Insert; import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert; import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -60,6 +61,7 @@ public abstract class AbstractKudu extends AbstractProcessor {
.description("List all kudu masters's ip with port (e.g. 7051), comma separated") .description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
@ -67,6 +69,7 @@ public abstract class AbstractKudu extends AbstractProcessor {
.description("The name of the Kudu Table to put data into") .description("The name of the Kudu Table to put data into")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
@ -94,6 +97,29 @@ public abstract class AbstractKudu extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
.name("Flush Mode")
.description("Set the new flush mode for a kudu session.\n" +
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
"AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" +
" operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" +
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
.required(true)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("Set the number of operations that can be buffered, between 2 - 100000. " +
"Depending on your memory size, and data size per row set an appropriate batch size. " +
"Gradually increase this number to find out the best one for best performances.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(2, 100000, true))
.expressionLanguageSupported(true)
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder() protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu") .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
@ -109,6 +135,8 @@ public abstract class AbstractKudu extends AbstractProcessor {
protected String tableName; protected String tableName;
protected boolean skipHeadLine; protected boolean skipHeadLine;
protected OperationType operationType; protected OperationType operationType;
protected SessionConfiguration.FlushMode flushMode;
protected int batchSize = 100;
protected KuduClient kuduClient; protected KuduClient kuduClient;
protected KuduTable kuduTable; protected KuduTable kuduTable;
@ -116,19 +144,22 @@ public abstract class AbstractKudu extends AbstractProcessor {
@OnScheduled @OnScheduled
public void OnScheduled(final ProcessContext context) { public void OnScheduled(final ProcessContext context) {
try { try {
tableName = context.getProperty(TABLE_NAME).getValue(); tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
kuduMasters = context.getProperty(KUDU_MASTERS).getValue(); kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
if(kuduClient == null) { if(kuduClient == null) {
getLogger().debug("Setting up Kudu connection..."); getLogger().debug("Setting up Kudu connection...");
kuduClient = getKuduConnection(kuduMasters); kuduClient = getKuduConnection(kuduMasters);
kuduTable = this.getKuduTable(kuduClient, tableName); kuduTable = this.getKuduTable(kuduClient, tableName);
getLogger().debug("Kudu connection successfully initialized"); getLogger().debug("Kudu connection successfully initialized");
} }
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
} catch(KuduException ex){ } catch(KuduException ex){
getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
} }
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
} }
@OnStopped @OnStopped
@ -223,6 +254,10 @@ public abstract class AbstractKudu extends AbstractProcessor {
protected KuduSession getKuduSession(KuduClient client){ protected KuduSession getKuduSession(KuduClient client){
KuduSession kuduSession = client.newSession(); KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize);
kuduSession.setFlushMode(flushMode);
if(operationType == OperationType.INSERT_IGNORE){ if(operationType == OperationType.INSERT_IGNORE){
kuduSession.setIgnoreAllDuplicateRows(true); kuduSession.setIgnoreAllDuplicateRows(true);
} }

View File

@ -43,7 +43,7 @@ import java.util.Set;
@EventDriven @EventDriven
@SupportsBatching @SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "database", "NoSQL", "kudu", "HDFS"}) @Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " + @CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
"to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." + "to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." +
" If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure") " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@ -58,6 +58,8 @@ public class PutKudu extends AbstractKudu {
properties.add(SKIP_HEAD_LINE); properties.add(SKIP_HEAD_LINE);
properties.add(RECORD_READER); properties.add(RECORD_READER);
properties.add(INSERT_OPERATION); properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE);
properties.add(BATCH_SIZE);
return properties; return properties;
} }