NIFI-1868: Incorporate PutHiveStreaming review comments

This closes #706.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-08-04 10:03:17 -04:00 committed by Bryan Bende
parent 59659232c7
commit 3943d72e95
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 433 additions and 104 deletions

View File

@ -18,8 +18,12 @@ package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -27,6 +31,7 @@ import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -60,6 +65,7 @@ import org.json.JSONObject;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@ -73,17 +79,21 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table. * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
*/ */
@TriggerSerially
@Tags({"hive", "streaming", "put", "database", "store"}) @Tags({"hive", "streaming", "put", "database", "store"})
@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in " @CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
+ "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). " + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
+ "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.") + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.") @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.")
}) })
public class PutHiveStreaming extends AbstractProcessor { public class PutHiveStreaming extends AbstractProcessor {
@ -110,6 +120,17 @@ public class PutHiveStreaming extends AbstractProcessor {
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}; };
// Metadata keys that are not transferred to split files when output strategy is datafile
// Avro will write this key/values pairs on its own
private static final Set<String> RESERVED_METADATA;
static {
Set<String> reservedMetadata = new HashSet<>();
reservedMetadata.add("avro.schema");
reservedMetadata.add("avro.codec");
RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata);
}
// Properties // Properties
public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
.name("hive-stream-metastore-uri") .name("hive-stream-metastore-uri")
@ -202,15 +223,20 @@ public class PutHiveStreaming extends AbstractProcessor {
// Relationships // Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated") .description("A FlowFile containing the JSON contents of a record is routed to this relationship after the record has been successfully transmitted to Hive.")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build(); .build();
public static final Relationship REL_FAILURE = new Relationship.Builder() public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.") .description("A FlowFile containing the JSON contents of a record is routed to this relationship if the record could not be transmitted to Hive.")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
+ "some records may have been processed successfully, they will be routed (as JSON flow files) to the success relationship. "
+ "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
+ "can be used to provide a retry capability since full rollback is not possible.")
.build(); .build();
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
@ -333,9 +359,14 @@ public class PutHiveStreaming extends AbstractProcessor {
} }
final ComponentLog log = getLogger(); final ComponentLog log = getLogger();
try { final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
final List<String> partitionColumnList; final List<String> partitionColumnList;
String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue(); final String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
if (StringUtils.isEmpty(partitionColumns)) { if (StringUtils.isEmpty(partitionColumns)) {
partitionColumnList = Collections.emptyList(); partitionColumnList = Collections.emptyList();
} else { } else {
@ -346,22 +377,48 @@ public class PutHiveStreaming extends AbstractProcessor {
} }
} }
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) final AtomicInteger recordCount = new AtomicInteger(0);
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); final AtomicInteger successfulRecordCount = new AtomicInteger(0);
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); List<HiveStreamingRecord> successfulRecords = new LinkedList<>();
final FlowFile inputFlowFile = flowFile;
final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false);
int recordCount = 0; // Create output flow files and their Avro writers
final List<HiveStreamingRecord> records = new LinkedList<>(); AtomicReference<FlowFile> successFlowFile = new AtomicReference<>(session.create(inputFlowFile));
final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
AtomicReference<FlowFile> failureFlowFile = new AtomicReference<>(session.create(inputFlowFile));
final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
session.read(flowFile, in -> { try {
session.read(inputFlowFile, in -> {
try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
GenericRecord currRecord = null;
// Copy codec and schema information to all writers
final String codec = reader.getMetaString(DataFileConstants.CODEC) == null
? DataFileConstants.NULL_CODEC
: reader.getMetaString(DataFileConstants.CODEC);
Arrays.asList(successAvroWriter, failureAvroWriter)
.forEach((writer) -> {
writer.setCodec(CodecFactory.fromString(codec));
// Transfer metadata (this is a subset of the incoming file)
for (String metaKey : reader.getMetaKeys()) {
if (!RESERVED_METADATA.contains(metaKey)) {
writer.setMeta(metaKey, reader.getMeta(metaKey));
}
}
});
GenericRecord currRecord;
while (reader.hasNext()) { while (reader.hasNext()) {
currRecord = reader.next(); currRecord = reader.next(currRecord);
recordCount.incrementAndGet();
// Extract the partition values (they must be put separately into the Hive Streaming API)
List<String> partitionValues = new ArrayList<>(); List<String> partitionValues = new ArrayList<>();
try {
for (String partition : partitionColumnList) { for (String partition : partitionColumnList) {
Object partitionValue = currRecord.get(partition); Object partitionValue = currRecord.get(partition);
if (partitionValue == null) { if (partitionValue == null) {
@ -369,10 +426,18 @@ public class PutHiveStreaming extends AbstractProcessor {
} }
partitionValues.add(partitionValue.toString()); partitionValues.add(partitionValue.toString());
} }
} catch (IOException ioe) {
// Add the failed record to the failure flow file
log.error("Error writing record to Hive Streaming transaction", ioe);
appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)),
failureFlowFile, failureAvroWriter, reader);
continue;
}
List<Schema.Field> fields = currRecord.getSchema().getFields(); List<Schema.Field> fields = currRecord.getSchema().getFields();
if (fields != null) { if (fields != null) {
JSONObject obj = new JSONObject(); JSONObject obj = new JSONObject();
try {
for (Schema.Field field : fields) { for (Schema.Field field : fields) {
String fieldName = field.name(); String fieldName = field.name();
// Skip fields that are partition columns, we extracted those values above to create an EndPoint // Skip fields that are partition columns, we extracted those values above to create an EndPoint
@ -385,53 +450,189 @@ public class PutHiveStreaming extends AbstractProcessor {
} }
} }
} }
records.add(new HiveStreamingRecord(partitionValues, obj)); } catch (IOException ioe) {
// This really shouldn't happen since we are iterating over the schema fields, but just in case,
// add the failed record to the failure flow file.
log.error("Error writing record to Hive Streaming transaction", ioe);
appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)),
failureFlowFile, failureAvroWriter, reader);
continue;
}
final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord);
HiveEndPoint endPoint = null;
HiveWriter hiveWriter = null;
try {
endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
hiveWriter = getOrCreateWriter(endPoint);
} catch (ConnectionError
| HiveWriter.ConnectFailure
| InterruptedException connectionError) {
// Can't connect to Hive endpoint.
log.error("Error connecting to Hive endpoint: table {} at {}",
new Object[]{options.getTableName(), options.getMetaStoreURI()});
// If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry
abortAndCloseWriters();
throw new ProcessException(connectionError);
}
try {
try {
hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
successfulRecords.add(record);
} catch (InterruptedException | HiveWriter.WriteFailure wf) {
// Add the failed record to the failure flow file
log.error("Error writing record to Hive Streaming transaction", wf);
appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
}
// If we've reached the transactions-per-batch limit, flush the Hive Writer and update the Avro Writer for successful records
if (hiveWriter.getTotalRecords() >= txnsPerBatch) {
hiveWriter.flush(true);
// Now send the records to the success relationship and update the success count
try {
appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader);
successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr);
// Clear the list of successful records, we'll use it at the end when we flush whatever records are left
successfulRecords.clear();
} catch (IOException ioe) {
// The records were put to Hive Streaming successfully, but there was an error while writing the
// Avro records to the flow file. Log as an error and move on.
getLogger().error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file", ioe);
} }
} }
} catch (InterruptedException
| HiveWriter.CommitFailure
| HiveWriter.TxnBatchFailure
| HiveWriter.TxnFailure
| SerializationError writeException) {
log.error("Error writing record to Hive Streaming transaction", writeException);
// Add the failed record to the failure flow file
appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
if (!(writeException instanceof SerializationError)) {
try {
hiveWriter.abort();
} catch (Exception e) {
// Can't even abort properly, throw a process exception
throw new ProcessException(e);
}
}
}
}
}
try {
// Finish any transactions
flushAllWriters(true);
closeAllWriters();
// Now send any remaining records to the success relationship and update the count
appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader);
successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr);
successfulRecords.clear();
} catch (HiveWriter.CommitFailure
| HiveWriter.TxnBatchFailure
| HiveWriter.TxnFailure
| InterruptedException e) {
// If any records are in the successfulRecords list but ended up here, then they actually weren't transferred successfully, so
// route them to failure instead
appendRecordsToFlowFile(session, successfulRecords, failureFlowFile, failureAvroWriter, reader);
}
} catch (IOException ioe) {
// The Avro file is invalid (or may not be an Avro file at all), send it to failure
log.error("The incoming flow file can not be read as an Avro file, routing to failure", ioe);
session.transfer(inputFlowFile, REL_FAILURE);
incomingFlowFileTransferred.set(true);
} }
}); });
// Write all records to Hive Streaming
for (HiveStreamingRecord record : records) { if (recordCount.get() > 0) {
HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); if (successfulRecordCount.get() > 0) {
HiveWriter writer = getOrCreateWriter(endPoint); // Transfer the flow file with successful records
writer.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); successFlowFile.set(
recordCount++; session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get())));
session.getProvenanceReporter().send(successFlowFile.get(), options.getMetaStoreURI());
session.transfer(successFlowFile.get(), REL_SUCCESS);
} else {
session.remove(successFlowFile.get());
} }
flowFile = session.putAttribute(flowFile, HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount)); if (recordCount.get() != successfulRecordCount.get()) {
flushAllWriters(true); // There were some failed records, so transfer that flow file to failure
failureFlowFile.set(
session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR,
Integer.toString(recordCount.get() - successfulRecordCount.get())));
session.transfer(failureFlowFile.get(), REL_FAILURE);
} else {
session.remove(failureFlowFile.get());
}
} else {
// No records were processed, so remove the output flow files
session.remove(successFlowFile.get());
session.remove(failureFlowFile.get());
}
successFlowFile.set(null);
failureFlowFile.set(null);
session.getProvenanceReporter().send(flowFile, options.getMetaStoreURI()); // If we got here, we've processed the outgoing flow files correctly, so remove the incoming one if necessary
session.transfer(flowFile, REL_SUCCESS); if (!incomingFlowFileTransferred.get()) {
session.remove(flowFile);
}
} catch (ProcessException pe) {
abortAndCloseWriters();
Throwable t = pe.getCause();
if (t != null) {
if (t instanceof ConnectionError
|| t instanceof HiveWriter.ConnectFailure
|| t instanceof HiveWriter.CommitFailure
|| t instanceof HiveWriter.TxnBatchFailure
|| t instanceof HiveWriter.TxnFailure
|| t instanceof InterruptedException) {
log.error("Hive Streaming connect/write error, flow file will be penalized and routed to retry", t);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
// Remove the ones we created
if (successFlowFile.get() != null) {
session.remove(successFlowFile.get());
}
if (failureFlowFile.get() != null) {
session.remove(failureFlowFile.get());
}
} else {
throw pe;
}
} else {
throw pe;
}
} finally {
// Restore original class loader, might not be necessary but is good practice since the processor task changed it // Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader); Thread.currentThread().setContextClassLoader(originalClassloader);
} catch (HiveWriter.CommitFailure commitFailure) {
log.error("Error committing to Hive", commitFailure);
session.transfer(flowFile, REL_FAILURE);
} catch (HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure txnFailure) {
log.error("Hive Streaming Transaction Failure", txnFailure);
session.transfer(flowFile, REL_FAILURE);
} catch (InterruptedException e) {
log.error("Hive Streaming Interrupted, flow file will be penalized and routed to retry", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} catch (ConnectionError | HiveWriter.ConnectFailure ce) {
log.error("Error while connecting via Hive Streaming, flow file will be penalized and routed to retry", ce);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} catch (SerializationError se) {
log.error("Serialization exception occurred, record not written to Hive.", se);
session.transfer(flowFile, REL_FAILURE);
} catch (HiveWriter.WriteFailure wf) {
log.error("Error while writing record to Hive Streaming", wf);
abortAndCloseWriters();
session.transfer(flowFile, REL_FAILURE);
} }
} }
private void appendRecordsToFlowFile(ProcessSession session,
List<HiveStreamingRecord> records,
AtomicReference<FlowFile> appendFlowFile,
DataFileWriter<GenericRecord> avroWriter,
DataFileStream<GenericRecord> reader) throws IOException {
appendFlowFile.set(session.append(appendFlowFile.get(), (out) -> {
try (DataFileWriter<GenericRecord> writer = avroWriter.create(reader.getSchema(), out)) {
for (HiveStreamingRecord sRecord : records) {
writer.append(sRecord.getRecord());
}
writer.flush();
}
}));
}
@OnStopped @OnStopped
public void cleanup() { public void cleanup() {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
@ -637,9 +838,9 @@ public class PutHiveStreaming extends AbstractProcessor {
protected class HiveStreamingRecord { protected class HiveStreamingRecord {
private List<String> partitionValues; private List<String> partitionValues;
private JSONObject record; private GenericRecord record;
public HiveStreamingRecord(List<String> partitionValues, JSONObject record) { public HiveStreamingRecord(List<String> partitionValues, GenericRecord record) {
this.partitionValues = partitionValues; this.partitionValues = partitionValues;
this.record = record; this.record = record;
} }
@ -648,7 +849,7 @@ public class PutHiveStreaming extends AbstractProcessor {
return partitionValues; return partitionValues;
} }
public JSONObject getRecord() { public GenericRecord getRecord() {
return record; return record;
} }

View File

@ -36,9 +36,6 @@ public class HiveUtils {
private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class); private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError { public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
if(partitionVals==null) {
return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
}
return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals); return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
} }

View File

@ -74,7 +74,7 @@ public class HiveWriter {
this.txnBatch = nextTxnBatch(recordWriter); this.txnBatch = nextTxnBatch(recordWriter);
this.closed = false; this.closed = false;
this.lastUsed = System.currentTimeMillis(); this.lastUsed = System.currentTimeMillis();
} catch (InterruptedException | RuntimeException e) { } catch (InterruptedException | RuntimeException | ConnectFailure e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
throw new ConnectFailure(endPoint, e); throw new ConnectFailure(endPoint, e);

View File

@ -17,8 +17,10 @@
package org.apache.nifi.processors.hive; package org.apache.nifi.processors.hive;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DatumWriter;
@ -31,17 +33,19 @@ import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveWriter; import org.apache.nifi.util.hive.HiveWriter;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -49,6 +53,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import static org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -57,8 +67,8 @@ import static org.mockito.Mockito.when;
*/ */
public class TestPutHiveStreaming { public class TestPutHiveStreaming {
TestRunner runner; private TestRunner runner;
MockPutHiveStreaming processor; private MockPutHiveStreaming processor;
private KerberosProperties kerberosPropsWithFile; private KerberosProperties kerberosPropsWithFile;
private KerberosProperties kerberosPropsWithoutFile; private KerberosProperties kerberosPropsWithoutFile;
@ -84,12 +94,6 @@ public class TestPutHiveStreaming {
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
} }
@After
public void tearDown() throws Exception {
}
@Test @Test
public void testSetup() throws Exception { public void testSetup() throws Exception {
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
@ -126,6 +130,17 @@ public class TestPutHiveStreaming {
runner.run(); runner.run();
} }
@Test
public void testSingleBatchInvalid() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
runner.assertValid();
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "1");
runner.assertNotValid();
}
@Test @Test
public void onTrigger() throws Exception { public void onTrigger() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
@ -142,7 +157,76 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
}
@Test
public void onTriggerBadInput() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
runner.enqueue("I am not an Avro record".getBytes());
runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
}
@Test
public void onTriggerMultipleRecords() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
Map<String, Object> user2 = new HashMap<String, Object>() {
{
put("name", "Mary");
put("favorite_number", 42);
}
};
Map<String, Object> user3 = new HashMap<String, Object>() {
{
put("name", "Matt");
put("favorite_number", 3);
}
};
runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
assertNotNull(resultFlowFile);
assertEquals("3", resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
final DataFileStream<GenericRecord> reader = new DataFileStream<>(
new ByteArrayInputStream(resultFlowFile.toByteArray()),
new GenericDatumReader<GenericRecord>());
Schema schema = reader.getSchema();
// Verify that the schema is preserved
assertTrue(schema.equals(new Schema.Parser().parse(new File("src/test/resources/user.avsc"))));
// Verify the records are intact. We can't guarantee order so check the total number and non-null fields
assertTrue(reader.hasNext());
GenericRecord record = reader.next(null);
assertNotNull(record.get("name"));
assertNotNull(record.get("favorite_number"));
assertNull(record.get("favorite_color"));
assertNull(record.get("scale"));
assertTrue(reader.hasNext());
record = reader.next(record);
assertTrue(reader.hasNext());
reader.next(record);
assertFalse(reader.hasNext());
} }
@Test @Test
@ -165,7 +249,35 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
}
@Test
public void onTriggerWithPartitionColumnsNotInRecord() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
put("favorite_color", "blue");
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
} }
@Test @Test
@ -186,7 +298,9 @@ public class TestPutHiveStreaming {
} }
runner.run(10); runner.run(10);
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 10);
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
} }
@Test @Test
@ -210,7 +324,9 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(1, true); runner.run(1, true);
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 2);
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
} }
@Test @Test
@ -230,7 +346,9 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_RETRY); runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
} }
@Test @Test
@ -250,7 +368,7 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_RETRY); runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
} }
@Test @Test
@ -267,10 +385,17 @@ public class TestPutHiveStreaming {
put("favorite_number", 146); put("favorite_number", 146);
} }
}; };
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); Map<String, Object> user2 = new HashMap<String, Object>() {
{
put("name", "Mary");
put("favorite_number", 42);
}
};
runner.enqueue(createAvroRecord(Arrays.asList(user1, user2)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
assertEquals("2", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
} }
@Test @Test
@ -290,7 +415,8 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
} }
@Test @Test
@ -310,7 +436,9 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
} }
@Test @Test
@ -330,7 +458,9 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
} }
@Test @Test
@ -377,7 +507,6 @@ public class TestPutHiveStreaming {
user.put("favorite_color", record.get("favorite_color")); user.put("favorite_color", record.get("favorite_color"));
users.add(user); users.add(user);
} }
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
@ -387,6 +516,7 @@ public class TestPutHiveStreaming {
} }
} }
return out.toByteArray(); return out.toByteArray();
} }
private class MockPutHiveStreaming extends PutHiveStreaming { private class MockPutHiveStreaming extends PutHiveStreaming {

View File

@ -20,6 +20,7 @@
"fields": [ "fields": [
{"name": "name", "type": "string"}, {"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]} {"name": "favorite_color", "type": ["string", "null"]},
{"name": "scale", "type": ["double", "null"]}
] ]
} }