NIFI-7740: Add Records Per Transaction and Transactions Per Batch properties to PutHive3Streaming

NIFI-7740: Incorporated review comments

NIFI-7740: Restore RecordsEOFException superclass to SerializationError

This closes #4489.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Matthew Burgess 2020-08-24 21:45:00 -04:00 committed by Peter Turcsanyi
parent 943904c12c
commit 45470b0984
5 changed files with 106 additions and 20 deletions

View File

@ -34,14 +34,17 @@ import java.util.Properties;
public class HiveRecordWriter extends AbstractRecordWriter {
private RecordReader recordReader;
private final RecordReader recordReader;
private NiFiRecordSerDe serde;
private ComponentLog log;
private final ComponentLog log;
private final int recordsPerTransaction;
private int currentRecordsWritten;
public HiveRecordWriter(RecordReader recordReader, ComponentLog log) {
public HiveRecordWriter(RecordReader recordReader, ComponentLog log, final int recordsPerTransaction) {
super(null);
this.recordReader = recordReader;
this.log = log;
this.recordsPerTransaction = recordsPerTransaction;
}
@Override
@ -73,10 +76,16 @@ public class HiveRecordWriter extends AbstractRecordWriter {
public void write(long writeId, InputStream inputStream) throws StreamingException {
// The inputStream is already available to the recordReader, so just iterate through the records
try {
Record record;
while ((record = recordReader.nextRecord()) != null) {
Record record = null;
while ((++currentRecordsWritten <= recordsPerTransaction || recordsPerTransaction == 0)
&& (record = recordReader.nextRecord()) != null) {
write(writeId, record);
}
// Once there are no more records, throw a RecordsEOFException to indicate the input stream is exhausted
if (record == null) {
throw new RecordsEOFException("End of transaction", new Exception());
}
currentRecordsWritten = 0;
} catch (MalformedRecordException | IOException e) {
throw new StreamingException(e.getLocalizedMessage(), e);
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.streaming;
/**
* This is a "marker class" used by the HiveRecordWriter to indicate there are no more records in the input stream.
* It is used by PutHive3Streaming to determine that all records have been written to transaction(s).
*/
public class RecordsEOFException extends SerializationError {
RecordsEOFException(String msg, Exception e) {
super(msg, e);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hive.streaming.ConnectionError;
import org.apache.hive.streaming.HiveRecordWriter;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.InvalidTable;
import org.apache.hive.streaming.RecordsEOFException;
import org.apache.hive.streaming.SerializationError;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
@ -171,6 +172,28 @@ public class PutHive3Streaming extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
.name("hive3-stream-records-per-transaction")
.displayName("Records per Transaction")
.description("Number of records to process before committing the transaction. If set to zero (0), all records will be written in a single transaction.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("0")
.build();
static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
.name("hive3-stream-transactions-per-batch")
.displayName("Transactions per Batch")
.description("A hint to Hive Streaming indicating how many transactions the processor task will need. The product of Records per Transaction (if not zero) "
+ "and Transactions per Batch should be larger than the largest expected number of records in the flow file(s), this will ensure any failed "
+ "transaction batches cause a full rollback.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive3-stream-call-timeout")
.displayName("Call Timeout")
@ -269,6 +292,8 @@ public class PutHive3Streaming extends AbstractProcessor {
props.add(DB_NAME);
props.add(TABLE_NAME);
props.add(STATIC_PARTITION_VALUES);
props.add(RECORDS_PER_TXN);
props.add(TXNS_PER_BATCH);
props.add(CALL_TIMEOUT);
props.add(DISABLE_STREAMING_OPTIMIZATIONS);
props.add(ROLLBACK_ON_FAILURE);
@ -355,10 +380,10 @@ public class PutHive3Streaming extends AbstractProcessor {
if (resolvedKeytab != null) {
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
} else if (explicitPassword != null) {
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{resolvedPrincipal});
} else {
throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
@ -409,13 +434,17 @@ public class PutHive3Streaming extends AbstractProcessor {
// Override the Hive Metastore URIs in the config if set by the user
if (metastoreURIs != null) {
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
}
final int recordsPerTransaction = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
final int transactionsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
.withHiveConf(hiveConfig)
.withCallTimeout(callTimeout)
.withStreamingOptimizations(!disableStreamingOptimizations);
.withStreamingOptimizations(!disableStreamingOptimizations)
.withTransactionBatchSize(transactionsPerBatch);
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
@ -444,7 +473,7 @@ public class PutHive3Streaming extends AbstractProcessor {
try {
final RecordReader reader;
try(final InputStream in = session.read(flowFile)) {
try (final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
@ -453,12 +482,21 @@ public class PutHive3Streaming extends AbstractProcessor {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
}
hiveStreamingConnection = makeStreamingConnection(options, reader);
hiveStreamingConnection = makeStreamingConnection(options, reader, recordsPerTransaction);
// Write records to Hive streaming, then commit and close
hiveStreamingConnection.beginTransaction();
hiveStreamingConnection.write(in);
hiveStreamingConnection.commitTransaction();
boolean exitLoop = false;
while (!exitLoop) {
hiveStreamingConnection.beginTransaction();
// The HiveRecordWriter keeps track of records per transaction and will complete writing for the transaction
// once the limit has been reached. It is then reset for the next iteration of the loop.
try {
hiveStreamingConnection.write(in);
} catch (RecordsEOFException reofe) {
exitLoop = true;
}
hiveStreamingConnection.commitTransaction();
}
in.close();
Map<String, String> updateAttributes = new HashMap<>();
@ -560,13 +598,14 @@ public class PutHive3Streaming extends AbstractProcessor {
});
}
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
return HiveStreamingConnection.newBuilder()
.withDatabase(options.getDatabaseName())
.withTable(options.getTableName())
.withStaticPartitionValues(options.getStaticPartitionValues())
.withHiveConf(options.getHiveConf())
.withRecordWriter(new HiveRecordWriter(reader, getLogger()))
.withRecordWriter(new HiveRecordWriter(reader, getLogger(), recordsPerTransaction))
.withTransactionBatchSize(options.getTransactionBatchSize())
.withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
+ "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
.connect();
@ -642,7 +681,7 @@ public class PutHive3Streaming extends AbstractProcessor {
KerberosUser kerberosUser = kerberosUserReference.get();
getLogger().debug("kerberosUser is " + kerberosUser);
try {
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -36,6 +36,7 @@ public class HiveOptions implements Serializable {
protected String kerberosKeytab;
protected HiveConf hiveConf;
protected boolean streamingOptimizations = true;
protected int transactionBatchSize = 1;
public HiveOptions(String metaStoreURI, String databaseName, String tableName) {
this.metaStoreURI = metaStoreURI;
@ -73,6 +74,11 @@ public class HiveOptions implements Serializable {
return this;
}
public HiveOptions withTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
return this;
}
public String getMetaStoreURI() {
return metaStoreURI;
}
@ -108,4 +114,8 @@ public class HiveOptions implements Serializable {
public boolean getStreamingOptimizations() {
return streamingOptimizations;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
}
}

View File

@ -1142,7 +1142,7 @@ public class TestPutHive3Streaming {
}
@Override
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
// Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set)
String userDefinedMetastoreURI = options.getMetaStoreURI();
@ -1154,7 +1154,7 @@ public class TestPutHive3Streaming {
throw new StubConnectionError("Unit Test - Connection Error");
}
HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger());
HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger(), 0);
if (generatePermissionsFailure) {
throw new StreamingException("Permission denied");
}