NIFI-3418 Add records-per-transaction property to putHiveStreaming processor

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1455

Minor whitespace Checkstyle issue fixed
This commit is contained in:
Ben Schofield 2017-01-30 05:44:19 -05:00 committed by Matt Burgess
parent 78a0e1e18b
commit d5b139ffd4
1 changed files with 14 additions and 3 deletions

View File

@ -219,6 +219,16 @@ public class PutHiveStreaming extends AbstractProcessor {
.defaultValue("100") .defaultValue("100")
.build(); .build();
public static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
.name("hive-stream-records-per-transaction")
.displayName("Records per Transaction")
.description("Number of records to process before committing the transaction. This value must be greater than 1.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(GREATER_THAN_ONE_VALIDATOR)
.defaultValue("10000")
.build();
// Relationships // Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -270,6 +280,7 @@ public class PutHiveStreaming extends AbstractProcessor {
props.add(MAX_OPEN_CONNECTIONS); props.add(MAX_OPEN_CONNECTIONS);
props.add(HEARTBEAT_INTERVAL); props.add(HEARTBEAT_INTERVAL);
props.add(TXNS_PER_BATCH); props.add(TXNS_PER_BATCH);
props.add(RECORDS_PER_TXN);
kerberosConfigFile = context.getKerberosConfigurationFile(); kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = new KerberosProperties(kerberosConfigFile); kerberosProperties = new KerberosProperties(kerberosConfigFile);
@ -357,7 +368,7 @@ public class PutHiveStreaming extends AbstractProcessor {
} }
final ComponentLog log = getLogger(); final ComponentLog log = getLogger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger(); final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).asInteger();
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
@ -482,8 +493,8 @@ public class PutHiveStreaming extends AbstractProcessor {
appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader); appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
} }
// If we've reached the transactions-per-batch limit, flush the Hive Writer and update the Avro Writer for successful records // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records
if (hiveWriter.getTotalRecords() >= txnsPerBatch) { if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
hiveWriter.flush(true); hiveWriter.flush(true);
// Now send the records to the success relationship and update the success count // Now send the records to the success relationship and update the success count
try { try {