diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index edb33dca6f..794b26884f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -219,6 +219,16 @@ public class PutHiveStreaming extends AbstractProcessor { .defaultValue("100") .build(); + public static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder() + .name("hive-stream-records-per-transaction") + .displayName("Records per Transaction") + .description("Number of records to process before committing the transaction. This value must be greater than 1.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(GREATER_THAN_ONE_VALIDATOR) + .defaultValue("10000") + .build(); + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -270,6 +280,7 @@ public class PutHiveStreaming extends AbstractProcessor { props.add(MAX_OPEN_CONNECTIONS); props.add(HEARTBEAT_INTERVAL); props.add(TXNS_PER_BATCH); + props.add(RECORDS_PER_TXN); kerberosConfigFile = context.getKerberosConfigurationFile(); kerberosProperties = new KerberosProperties(kerberosConfigFile); @@ -357,7 +368,7 @@ public class PutHiveStreaming extends AbstractProcessor { } final ComponentLog log = getLogger(); - final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger(); + final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).asInteger(); // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); @@ -482,8 +493,8 @@ public class PutHiveStreaming extends AbstractProcessor { appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader); } - // If we've reached the transactions-per-batch limit, flush the Hive Writer and update the Avro Writer for successful records - if (hiveWriter.getTotalRecords() >= txnsPerBatch) { + // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records + if (hiveWriter.getTotalRecords() >= recordsPerTxn) { hiveWriter.flush(true); // Now send the records to the success relationship and update the success count try {