diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java index 6edb374c7a..d1b55e9650 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java @@ -34,14 +34,17 @@ import java.util.Properties; public class HiveRecordWriter extends AbstractRecordWriter { - private RecordReader recordReader; + private final RecordReader recordReader; private NiFiRecordSerDe serde; - private ComponentLog log; + private final ComponentLog log; + private final int recordsPerTransaction; + private int currentRecordsWritten; - public HiveRecordWriter(RecordReader recordReader, ComponentLog log) { + public HiveRecordWriter(RecordReader recordReader, ComponentLog log, final int recordsPerTransaction) { super(null); this.recordReader = recordReader; this.log = log; + this.recordsPerTransaction = recordsPerTransaction; } @Override @@ -73,10 +76,16 @@ public class HiveRecordWriter extends AbstractRecordWriter { public void write(long writeId, InputStream inputStream) throws StreamingException { // The inputStream is already available to the recordReader, so just iterate through the records try { - Record record; - while ((record = recordReader.nextRecord()) != null) { + Record record = null; + while ((++currentRecordsWritten <= recordsPerTransaction || recordsPerTransaction == 0) + && (record = recordReader.nextRecord()) != null) { write(writeId, record); } + // Once there are no more records, throw a RecordsEOFException to indicate the input stream is exhausted + if (record == null) { + throw new RecordsEOFException("End of transaction", new Exception()); + } + currentRecordsWritten = 0; } catch (MalformedRecordException | IOException e) { throw new StreamingException(e.getLocalizedMessage(), e); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java new file mode 100644 index 0000000000..41e641b786 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +/** + * This is a "marker class" used by the HiveRecordWriter to indicate there are no more records in the input stream. + * It is used by PutHive3Streaming to determine that all records have been written to transaction(s). + */ +public class RecordsEOFException extends SerializationError { + + RecordsEOFException(String msg, Exception e) { + super(msg, e); + } +} diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index 23b873f351..0ba6bd2c29 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -24,6 +24,7 @@ import org.apache.hive.streaming.ConnectionError; import org.apache.hive.streaming.HiveRecordWriter; import org.apache.hive.streaming.HiveStreamingConnection; import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.RecordsEOFException; import org.apache.hive.streaming.SerializationError; import org.apache.hive.streaming.StreamingConnection; import org.apache.hive.streaming.StreamingException; @@ -171,6 +172,28 @@ public class PutHive3Streaming extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder() + .name("hive3-stream-records-per-transaction") + .displayName("Records per Transaction") + .description("Number of records to process before committing the transaction. If set to zero (0), all records will be written in a single transaction.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("0") + .build(); + + static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder() + .name("hive3-stream-transactions-per-batch") + .displayName("Transactions per Batch") + .description("A hint to Hive Streaming indicating how many transactions the processor task will need. The product of Records per Transaction (if not zero) " + + "and Transactions per Batch should be larger than the largest expected number of records in the flow file(s), this will ensure any failed " + + "transaction batches cause a full rollback.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder() .name("hive3-stream-call-timeout") .displayName("Call Timeout") @@ -269,6 +292,8 @@ public class PutHive3Streaming extends AbstractProcessor { props.add(DB_NAME); props.add(TABLE_NAME); props.add(STATIC_PARTITION_VALUES); + props.add(RECORDS_PER_TXN); + props.add(TXNS_PER_BATCH); props.add(CALL_TIMEOUT); props.add(DISABLE_STREAMING_OPTIMIZATIONS); props.add(ROLLBACK_ON_FAILURE); @@ -355,10 +380,10 @@ public class PutHive3Streaming extends AbstractProcessor { if (resolvedKeytab != null) { kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab)); - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab}); } else if (explicitPassword != null) { kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword)); - log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + log.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{resolvedPrincipal}); } else { throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided"); } @@ -409,13 +434,17 @@ public class PutHive3Streaming extends AbstractProcessor { // Override the Hive Metastore URIs in the config if set by the user if (metastoreURIs != null) { - hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs); + hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs); } + final int recordsPerTransaction = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger(); + final int transactionsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger(); + HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName) .withHiveConf(hiveConfig) .withCallTimeout(callTimeout) - .withStreamingOptimizations(!disableStreamingOptimizations); + .withStreamingOptimizations(!disableStreamingOptimizations) + .withTransactionBatchSize(transactionsPerBatch); if (!StringUtils.isEmpty(staticPartitionValuesString)) { List staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList()); @@ -444,7 +473,7 @@ public class PutHive3Streaming extends AbstractProcessor { try { final RecordReader reader; - try(final InputStream in = session.read(flowFile)) { + try (final InputStream in = session.read(flowFile)) { // if we fail to create the RecordReader then we want to route to failure, so we need to // handle this separately from the other IOExceptions which normally route to retry try { @@ -453,12 +482,21 @@ public class PutHive3Streaming extends AbstractProcessor { throw new RecordReaderFactoryException("Unable to create RecordReader", e); } - hiveStreamingConnection = makeStreamingConnection(options, reader); + hiveStreamingConnection = makeStreamingConnection(options, reader, recordsPerTransaction); // Write records to Hive streaming, then commit and close - hiveStreamingConnection.beginTransaction(); - hiveStreamingConnection.write(in); - hiveStreamingConnection.commitTransaction(); + boolean exitLoop = false; + while (!exitLoop) { + hiveStreamingConnection.beginTransaction(); + // The HiveRecordWriter keeps track of records per transaction and will complete writing for the transaction + // once the limit has been reached. It is then reset for the next iteration of the loop. + try { + hiveStreamingConnection.write(in); + } catch (RecordsEOFException reofe) { + exitLoop = true; + } + hiveStreamingConnection.commitTransaction(); + } in.close(); Map updateAttributes = new HashMap<>(); @@ -560,13 +598,14 @@ public class PutHive3Streaming extends AbstractProcessor { }); } - StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { + StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException { return HiveStreamingConnection.newBuilder() .withDatabase(options.getDatabaseName()) .withTable(options.getTableName()) .withStaticPartitionValues(options.getStaticPartitionValues()) .withHiveConf(options.getHiveConf()) - .withRecordWriter(new HiveRecordWriter(reader, getLogger())) + .withRecordWriter(new HiveRecordWriter(reader, getLogger(), recordsPerTransaction)) + .withTransactionBatchSize(options.getTransactionBatchSize()) .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier() + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]") .connect(); @@ -642,7 +681,7 @@ public class PutHive3Streaming extends AbstractProcessor { KerberosUser kerberosUser = kerberosUserReference.get(); getLogger().debug("kerberosUser is " + kerberosUser); try { - getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser}); + getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser}); kerberosUser.checkTGTAndRelogin(); } catch (LoginException e) { throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java index 82f6856160..7efa106ac7 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,6 +36,7 @@ public class HiveOptions implements Serializable { protected String kerberosKeytab; protected HiveConf hiveConf; protected boolean streamingOptimizations = true; + protected int transactionBatchSize = 1; public HiveOptions(String metaStoreURI, String databaseName, String tableName) { this.metaStoreURI = metaStoreURI; @@ -73,6 +74,11 @@ public class HiveOptions implements Serializable { return this; } + public HiveOptions withTransactionBatchSize(int transactionBatchSize) { + this.transactionBatchSize = transactionBatchSize; + return this; + } + public String getMetaStoreURI() { return metaStoreURI; } @@ -108,4 +114,8 @@ public class HiveOptions implements Serializable { public boolean getStreamingOptimizations() { return streamingOptimizations; } + + public int getTransactionBatchSize() { + return transactionBatchSize; + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index 2b6487e1e6..05e44fb5cb 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -1142,7 +1142,7 @@ public class TestPutHive3Streaming { } @Override - StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { + StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException { // Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set) String userDefinedMetastoreURI = options.getMetaStoreURI(); @@ -1154,7 +1154,7 @@ public class TestPutHive3Streaming { throw new StubConnectionError("Unit Test - Connection Error"); } - HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger()); + HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger(), 0); if (generatePermissionsFailure) { throw new StreamingException("Permission denied"); }