diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index 82843cf109..810471bb09 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -435,7 +435,11 @@ public class PutHive3Streaming extends AbstractProcessor { throw new ShouldRetryException(te.getLocalizedMessage(), te); } } catch (RecordReaderFactoryException rrfe) { - throw new ProcessException(rrfe); + if (rollbackOnFailure) { + throw new ProcessException(rrfe); + } else { + session.transfer(flowFile, REL_FAILURE); + } } } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) { if (rollbackOnFailure) { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index ee05416ad8..74623d0f8c 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -54,8 +54,10 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MapRecord; @@ -152,18 +154,26 @@ public class TestPutHive3Streaming { } private void configure(final PutHive3Streaming processor, final int numUsers) throws InitializationException { - configure(processor, numUsers, -1); + configure(processor, numUsers, false, -1); } - private void configure(final PutHive3Streaming processor, final int numUsers, int failAfter) throws InitializationException { - configure(processor, numUsers, failAfter, null); + private void configure(final PutHive3Streaming processor, final int numUsers, boolean failOnCreateReader, int failAfter) throws InitializationException { + configure(processor, numUsers, failOnCreateReader, failAfter, null); } - private void configure(final PutHive3Streaming processor, final int numUsers, final int failAfter, + private void configure(final PutHive3Streaming processor, final int numUsers, final boolean failOnCreateReader, final int failAfter, final BiFunction recordGenerator) throws InitializationException { runner = TestRunners.newTestRunner(processor); runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); - MockRecordParser readerFactory = new MockRecordParser(); + MockRecordParser readerFactory = new MockRecordParser() { + @Override + public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + if (failOnCreateReader) { + throw new SchemaNotFoundException("test"); + } + return super.createRecordReader(variables, in, logger); + } + }; final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); for (final RecordField recordField : recordSchema.getFields()) { readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); @@ -344,7 +354,7 @@ public class TestPutHive3Streaming { @Test public void onTriggerBadInput() throws Exception { - configure(processor, 1, 0); + configure(processor, 1, false, 0); runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); @@ -356,7 +366,7 @@ public class TestPutHive3Streaming { @Test public void onTriggerBadInputRollbackOnFailure() throws Exception { - configure(processor, 1, 0); + configure(processor, 1, false, 0); runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); @@ -375,6 +385,39 @@ public class TestPutHive3Streaming { assertEquals(1, runner.getQueueSize().getObjectCount()); } + @Test + public void onTriggerBadCreate() throws Exception { + configure(processor, 1, true, 0); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + } + + @Test + public void onTriggerBadCreateRollbackOnFailure() throws Exception { + configure(processor, 1, true, 0); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + runner.enqueue(new byte[0]); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + @Test public void onTriggerMultipleRecordsSingleTransaction() throws Exception {