diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index e4515da33a..c14d0aafd3 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -18,8 +18,12 @@ package org.apache.nifi.processors.hive; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -27,6 +31,7 @@ import org.apache.hive.hcatalog.streaming.ConnectionError; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -60,6 +65,7 @@ import org.json.JSONObject; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -73,17 +79,21 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; /** * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table. */ +@TriggerSerially @Tags({"hive", "streaming", "put", "database", "store"}) @CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in " + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). " - + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ") + + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.") @WritesAttributes({ - @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.") + @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' " + + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.") }) public class PutHiveStreaming extends AbstractProcessor { @@ -110,6 +120,17 @@ public class PutHiveStreaming extends AbstractProcessor { return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); }; + // Metadata keys that are not transferred to split files when output strategy is datafile + // Avro will write this key/values pairs on its own + private static final Set RESERVED_METADATA; + + static { + Set reservedMetadata = new HashSet<>(); + reservedMetadata.add("avro.schema"); + reservedMetadata.add("avro.codec"); + RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata); + } + // Properties public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() .name("hive-stream-metastore-uri") @@ -202,15 +223,20 @@ public class PutHiveStreaming extends AbstractProcessor { // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("A FlowFile is routed to this relationship after the database is successfully updated") - .build(); - public static final Relationship REL_RETRY = new Relationship.Builder() - .name("retry") - .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .description("A FlowFile containing the JSON contents of a record is routed to this relationship after the record has been successfully transmitted to Hive.") .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.") + .description("A FlowFile containing the JSON contents of a record is routed to this relationship if the record could not be transmitted to Hive.") + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that " + + "some records may have been processed successfully, they will be routed (as JSON flow files) to the success relationship. " + + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This " + + "can be used to provide a retry capability since full rollback is not possible.") .build(); private final static List propertyDescriptors; @@ -333,105 +359,280 @@ public class PutHiveStreaming extends AbstractProcessor { } final ComponentLog log = getLogger(); - try { - final List partitionColumnList; - String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue(); - if (StringUtils.isEmpty(partitionColumns)) { - partitionColumnList = Collections.emptyList(); - } else { - String[] partitionCols = partitionColumns.split(","); - partitionColumnList = new ArrayList<>(partitionCols.length); - for (String col : partitionCols) { - partitionColumnList.add(col.trim()); - } + final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger(); + + // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) + ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + + final List partitionColumnList; + final String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue(); + if (StringUtils.isEmpty(partitionColumns)) { + partitionColumnList = Collections.emptyList(); + } else { + String[] partitionCols = partitionColumns.split(","); + partitionColumnList = new ArrayList<>(partitionCols.length); + for (String col : partitionCols) { + partitionColumnList.add(col.trim()); } + } - // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) - ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + final AtomicInteger recordCount = new AtomicInteger(0); + final AtomicInteger successfulRecordCount = new AtomicInteger(0); + List successfulRecords = new LinkedList<>(); + final FlowFile inputFlowFile = flowFile; + final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false); - int recordCount = 0; - final List records = new LinkedList<>(); + // Create output flow files and their Avro writers + AtomicReference successFlowFile = new AtomicReference<>(session.create(inputFlowFile)); + final DataFileWriter successAvroWriter = new DataFileWriter<>(new GenericDatumWriter()); + AtomicReference failureFlowFile = new AtomicReference<>(session.create(inputFlowFile)); + final DataFileWriter failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter()); - session.read(flowFile, in -> { + try { + session.read(inputFlowFile, in -> { try (final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + GenericRecord currRecord = null; + + // Copy codec and schema information to all writers + final String codec = reader.getMetaString(DataFileConstants.CODEC) == null + ? DataFileConstants.NULL_CODEC + : reader.getMetaString(DataFileConstants.CODEC); + + Arrays.asList(successAvroWriter, failureAvroWriter) + .forEach((writer) -> { + writer.setCodec(CodecFactory.fromString(codec)); + // Transfer metadata (this is a subset of the incoming file) + for (String metaKey : reader.getMetaKeys()) { + if (!RESERVED_METADATA.contains(metaKey)) { + writer.setMeta(metaKey, reader.getMeta(metaKey)); + } + } + }); - GenericRecord currRecord; while (reader.hasNext()) { - currRecord = reader.next(); + currRecord = reader.next(currRecord); + recordCount.incrementAndGet(); + + // Extract the partition values (they must be put separately into the Hive Streaming API) List partitionValues = new ArrayList<>(); - for (String partition : partitionColumnList) { - Object partitionValue = currRecord.get(partition); - if (partitionValue == null) { - throw new IOException("Partition column '" + partition + "' not found in Avro record"); + try { + for (String partition : partitionColumnList) { + Object partitionValue = currRecord.get(partition); + if (partitionValue == null) { + throw new IOException("Partition column '" + partition + "' not found in Avro record"); + } + partitionValues.add(partitionValue.toString()); } - partitionValues.add(partitionValue.toString()); + } catch (IOException ioe) { + // Add the failed record to the failure flow file + log.error("Error writing record to Hive Streaming transaction", ioe); + appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)), + failureFlowFile, failureAvroWriter, reader); + continue; } List fields = currRecord.getSchema().getFields(); if (fields != null) { JSONObject obj = new JSONObject(); - for (Schema.Field field : fields) { - String fieldName = field.name(); - // Skip fields that are partition columns, we extracted those values above to create an EndPoint - if (!partitionColumnList.contains(fieldName)) { - Object value = currRecord.get(fieldName); + try { + for (Schema.Field field : fields) { + String fieldName = field.name(); + // Skip fields that are partition columns, we extracted those values above to create an EndPoint + if (!partitionColumnList.contains(fieldName)) { + Object value = currRecord.get(fieldName); + try { + obj.put(fieldName, value); + } catch (JSONException je) { + throw new IOException(je); + } + } + } + } catch (IOException ioe) { + // This really shouldn't happen since we are iterating over the schema fields, but just in case, + // add the failed record to the failure flow file. + log.error("Error writing record to Hive Streaming transaction", ioe); + appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)), + failureFlowFile, failureAvroWriter, reader); + continue; + } + final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord); + HiveEndPoint endPoint = null; + HiveWriter hiveWriter = null; + try { + endPoint = makeHiveEndPoint(record.getPartitionValues(), options); + hiveWriter = getOrCreateWriter(endPoint); + } catch (ConnectionError + | HiveWriter.ConnectFailure + | InterruptedException connectionError) { + // Can't connect to Hive endpoint. + log.error("Error connecting to Hive endpoint: table {} at {}", + new Object[]{options.getTableName(), options.getMetaStoreURI()}); + // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry + abortAndCloseWriters(); + throw new ProcessException(connectionError); + } + try { + try { + hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); + successfulRecords.add(record); + } catch (InterruptedException | HiveWriter.WriteFailure wf) { + // Add the failed record to the failure flow file + log.error("Error writing record to Hive Streaming transaction", wf); + appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader); + } + + // If we've reached the transactions-per-batch limit, flush the Hive Writer and update the Avro Writer for successful records + if (hiveWriter.getTotalRecords() >= txnsPerBatch) { + hiveWriter.flush(true); + // Now send the records to the success relationship and update the success count try { - obj.put(fieldName, value); - } catch (JSONException je) { - throw new IOException(je); + appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader); + successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr); + + // Clear the list of successful records, we'll use it at the end when we flush whatever records are left + successfulRecords.clear(); + + } catch (IOException ioe) { + // The records were put to Hive Streaming successfully, but there was an error while writing the + // Avro records to the flow file. Log as an error and move on. + getLogger().error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file", ioe); + } + } + + } catch (InterruptedException + | HiveWriter.CommitFailure + | HiveWriter.TxnBatchFailure + | HiveWriter.TxnFailure + | SerializationError writeException) { + + log.error("Error writing record to Hive Streaming transaction", writeException); + // Add the failed record to the failure flow file + appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader); + + if (!(writeException instanceof SerializationError)) { + try { + hiveWriter.abort(); + } catch (Exception e) { + // Can't even abort properly, throw a process exception + throw new ProcessException(e); } } } - records.add(new HiveStreamingRecord(partitionValues, obj)); } } + try { + // Finish any transactions + flushAllWriters(true); + closeAllWriters(); + + // Now send any remaining records to the success relationship and update the count + appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader); + successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr); + successfulRecords.clear(); + + } catch (HiveWriter.CommitFailure + | HiveWriter.TxnBatchFailure + | HiveWriter.TxnFailure + | InterruptedException e) { + + // If any records are in the successfulRecords list but ended up here, then they actually weren't transferred successfully, so + // route them to failure instead + appendRecordsToFlowFile(session, successfulRecords, failureFlowFile, failureAvroWriter, reader); + } + } catch (IOException ioe) { + // The Avro file is invalid (or may not be an Avro file at all), send it to failure + log.error("The incoming flow file can not be read as an Avro file, routing to failure", ioe); + session.transfer(inputFlowFile, REL_FAILURE); + incomingFlowFileTransferred.set(true); } }); - // Write all records to Hive Streaming - for (HiveStreamingRecord record : records) { - HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); - HiveWriter writer = getOrCreateWriter(endPoint); - writer.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); - recordCount++; + + if (recordCount.get() > 0) { + if (successfulRecordCount.get() > 0) { + // Transfer the flow file with successful records + successFlowFile.set( + session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get()))); + session.getProvenanceReporter().send(successFlowFile.get(), options.getMetaStoreURI()); + session.transfer(successFlowFile.get(), REL_SUCCESS); + } else { + session.remove(successFlowFile.get()); + } + + if (recordCount.get() != successfulRecordCount.get()) { + // There were some failed records, so transfer that flow file to failure + failureFlowFile.set( + session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, + Integer.toString(recordCount.get() - successfulRecordCount.get()))); + session.transfer(failureFlowFile.get(), REL_FAILURE); + } else { + session.remove(failureFlowFile.get()); + } + } else { + // No records were processed, so remove the output flow files + session.remove(successFlowFile.get()); + session.remove(failureFlowFile.get()); + } + successFlowFile.set(null); + failureFlowFile.set(null); + + // If we got here, we've processed the outgoing flow files correctly, so remove the incoming one if necessary + if (!incomingFlowFileTransferred.get()) { + session.remove(flowFile); } - flowFile = session.putAttribute(flowFile, HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount)); - flushAllWriters(true); - - session.getProvenanceReporter().send(flowFile, options.getMetaStoreURI()); - session.transfer(flowFile, REL_SUCCESS); - + } catch (ProcessException pe) { + abortAndCloseWriters(); + Throwable t = pe.getCause(); + if (t != null) { + if (t instanceof ConnectionError + || t instanceof HiveWriter.ConnectFailure + || t instanceof HiveWriter.CommitFailure + || t instanceof HiveWriter.TxnBatchFailure + || t instanceof HiveWriter.TxnFailure + || t instanceof InterruptedException) { + log.error("Hive Streaming connect/write error, flow file will be penalized and routed to retry", t); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_RETRY); + // Remove the ones we created + if (successFlowFile.get() != null) { + session.remove(successFlowFile.get()); + } + if (failureFlowFile.get() != null) { + session.remove(failureFlowFile.get()); + } + } else { + throw pe; + } + } else { + throw pe; + } + } finally { // Restore original class loader, might not be necessary but is good practice since the processor task changed it Thread.currentThread().setContextClassLoader(originalClassloader); - - } catch (HiveWriter.CommitFailure commitFailure) { - log.error("Error committing to Hive", commitFailure); - session.transfer(flowFile, REL_FAILURE); - } catch (HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure txnFailure) { - log.error("Hive Streaming Transaction Failure", txnFailure); - session.transfer(flowFile, REL_FAILURE); - } catch (InterruptedException e) { - log.error("Hive Streaming Interrupted, flow file will be penalized and routed to retry", e); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_RETRY); - } catch (ConnectionError | HiveWriter.ConnectFailure ce) { - log.error("Error while connecting via Hive Streaming, flow file will be penalized and routed to retry", ce); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_RETRY); - } catch (SerializationError se) { - log.error("Serialization exception occurred, record not written to Hive.", se); - session.transfer(flowFile, REL_FAILURE); - } catch (HiveWriter.WriteFailure wf) { - log.error("Error while writing record to Hive Streaming", wf); - abortAndCloseWriters(); - session.transfer(flowFile, REL_FAILURE); } } + private void appendRecordsToFlowFile(ProcessSession session, + List records, + AtomicReference appendFlowFile, + DataFileWriter avroWriter, + DataFileStream reader) throws IOException { + + appendFlowFile.set(session.append(appendFlowFile.get(), (out) -> { + + try (DataFileWriter writer = avroWriter.create(reader.getSchema(), out)) { + for (HiveStreamingRecord sRecord : records) { + writer.append(sRecord.getRecord()); + } + writer.flush(); + } + })); + } + @OnStopped public void cleanup() { ComponentLog log = getLogger(); @@ -637,9 +838,9 @@ public class PutHiveStreaming extends AbstractProcessor { protected class HiveStreamingRecord { private List partitionValues; - private JSONObject record; + private GenericRecord record; - public HiveStreamingRecord(List partitionValues, JSONObject record) { + public HiveStreamingRecord(List partitionValues, GenericRecord record) { this.partitionValues = partitionValues; this.record = record; } @@ -648,7 +849,7 @@ public class PutHiveStreaming extends AbstractProcessor { return partitionValues; } - public JSONObject getRecord() { + public GenericRecord getRecord() { return record; } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java index ad34226b64..ca9ceeb3be 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java @@ -36,9 +36,6 @@ public class HiveUtils { private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class); public static HiveEndPoint makeEndPoint(List partitionVals, HiveOptions options) throws ConnectionError { - if(partitionVals==null) { - return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null); - } return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java index 15cf978ea2..387e53f442 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java @@ -74,7 +74,7 @@ public class HiveWriter { this.txnBatch = nextTxnBatch(recordWriter); this.closed = false; this.lastUsed = System.currentTimeMillis(); - } catch (InterruptedException | RuntimeException e) { + } catch (InterruptedException | RuntimeException | ConnectFailure e) { throw e; } catch (Exception e) { throw new ConnectFailure(endPoint, e); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index c38a708417..3876e74c24 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -17,8 +17,10 @@ package org.apache.nifi.processors.hive; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; @@ -31,17 +33,19 @@ import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveWriter; -import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -49,6 +53,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import static org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -57,8 +67,8 @@ import static org.mockito.Mockito.when; */ public class TestPutHiveStreaming { - TestRunner runner; - MockPutHiveStreaming processor; + private TestRunner runner; + private MockPutHiveStreaming processor; private KerberosProperties kerberosPropsWithFile; private KerberosProperties kerberosPropsWithoutFile; @@ -84,12 +94,6 @@ public class TestPutHiveStreaming { runner = TestRunners.newTestRunner(processor); } - @After - public void tearDown() throws Exception { - - } - - @Test public void testSetup() throws Exception { runner.setValidateExpressionUsage(false); @@ -126,6 +130,17 @@ public class TestPutHiveStreaming { runner.run(); } + @Test + public void testSingleBatchInvalid() throws Exception { + runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHiveStreaming.DB_NAME, "default"); + runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); + runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2"); + runner.assertValid(); + runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "1"); + runner.assertNotValid(); + } + @Test public void onTrigger() throws Exception { runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); @@ -142,7 +157,76 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); + assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + } + + @Test + public void onTriggerBadInput() throws Exception { + runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHiveStreaming.DB_NAME, "default"); + runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); + runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); + runner.setValidateExpressionUsage(false); + runner.enqueue("I am not an Avro record".getBytes()); + runner.run(); + + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); + } + + @Test + public void onTriggerMultipleRecords() throws Exception { + runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHiveStreaming.DB_NAME, "default"); + runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); + runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2"); + runner.setValidateExpressionUsage(false); + Map user1 = new HashMap() { + { + put("name", "Joe"); + put("favorite_number", 146); + } + }; + Map user2 = new HashMap() { + { + put("name", "Mary"); + put("favorite_number", 42); + } + }; + Map user3 = new HashMap() { + { + put("name", "Matt"); + put("favorite_number", 3); + } + }; + runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3))); + runner.run(); + + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0); + assertNotNull(resultFlowFile); + assertEquals("3", resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR)); + final DataFileStream reader = new DataFileStream<>( + new ByteArrayInputStream(resultFlowFile.toByteArray()), + new GenericDatumReader()); + + Schema schema = reader.getSchema(); + + // Verify that the schema is preserved + assertTrue(schema.equals(new Schema.Parser().parse(new File("src/test/resources/user.avsc")))); + + // Verify the records are intact. We can't guarantee order so check the total number and non-null fields + assertTrue(reader.hasNext()); + GenericRecord record = reader.next(null); + assertNotNull(record.get("name")); + assertNotNull(record.get("favorite_number")); + assertNull(record.get("favorite_color")); + assertNull(record.get("scale")); + assertTrue(reader.hasNext()); + record = reader.next(record); + assertTrue(reader.hasNext()); + reader.next(record); + assertFalse(reader.hasNext()); } @Test @@ -165,7 +249,35 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); + assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); + } + + @Test + public void onTriggerWithPartitionColumnsNotInRecord() throws Exception { + runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHiveStreaming.DB_NAME, "default"); + runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); + runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); + runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food"); + runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false"); + runner.setValidateExpressionUsage(false); + Map user1 = new HashMap() { + { + put("name", "Joe"); + put("favorite_number", 146); + put("favorite_color", "blue"); + } + }; + + runner.enqueue(createAvroRecord(Collections.singletonList(user1))); + runner.run(); + + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); } @Test @@ -186,7 +298,9 @@ public class TestPutHiveStreaming { } runner.run(10); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 10); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); } @Test @@ -210,7 +324,9 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(1, true); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 2); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); } @Test @@ -230,7 +346,9 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_RETRY); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0); } @Test @@ -250,7 +368,7 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_RETRY); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1); } @Test @@ -267,10 +385,17 @@ public class TestPutHiveStreaming { put("favorite_number", 146); } }; - runner.enqueue(createAvroRecord(Collections.singletonList(user1))); + Map user2 = new HashMap() { + { + put("name", "Mary"); + put("favorite_number", 42); + } + }; + runner.enqueue(createAvroRecord(Arrays.asList(user1, user2))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); + assertEquals("2", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); } @Test @@ -290,7 +415,8 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); } @Test @@ -310,7 +436,9 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); } @Test @@ -330,7 +458,9 @@ public class TestPutHiveStreaming { runner.enqueue(createAvroRecord(Collections.singletonList(user1))); runner.run(); - runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE); + runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); + runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); } @Test @@ -377,7 +507,6 @@ public class TestPutHiveStreaming { user.put("favorite_color", record.get("favorite_color")); users.add(user); } - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); ByteArrayOutputStream out = new ByteArrayOutputStream(); try (DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { @@ -387,6 +516,7 @@ public class TestPutHiveStreaming { } } return out.toByteArray(); + } private class MockPutHiveStreaming extends PutHiveStreaming { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/resources/user.avsc b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/resources/user.avsc index d79828c5d0..95ef6e4fd0 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/resources/user.avsc +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/resources/user.avsc @@ -20,6 +20,7 @@ "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "scale", "type": ["double", "null"]} ] }