diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml index 33a148d048..e790283931 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml @@ -135,5 +135,10 @@ 1.10.0-SNAPSHOT test + + org.hamcrest + hamcrest-all + test + diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index 5f3445aba5..2224b06493 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -431,6 +431,11 @@ public class PutHive3Streaming extends AbstractProcessor { if (rollbackOnFailure) { throw new ProcessException(rrfe); } else { + log.error( + "Failed to create {} for {} - routing to failure", + new Object[]{RecordReader.class.getSimpleName(), flowFile}, + rrfe + ); session.transfer(flowFile, REL_FAILURE); } } @@ -445,6 +450,11 @@ public class PutHive3Streaming extends AbstractProcessor { updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten())); updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); flowFile = session.putAllAttributes(flowFile, updateAttributes); + log.error( + "Exception while processing {} - routing to failure", + new Object[]{flowFile}, + e + ); session.transfer(flowFile, REL_FAILURE); } } catch (DiscontinuedException e) { @@ -479,6 +489,11 @@ public class PutHive3Streaming extends AbstractProcessor { updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten())); updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); flowFile = session.putAllAttributes(flowFile, updateAttributes); + log.error( + "Exception while trying to stream {} to hive - routing to failure", + new Object[]{flowFile}, + se + ); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index 6acabe3a30..c82f55c34c 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -97,10 +97,14 @@ import java.util.function.BiFunction; import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; import static org.apache.nifi.processors.hive.PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR; import static org.apache.nifi.processors.hive.PutHive3Streaming.KERBEROS_CREDENTIALS_SERVICE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.Matchers.hasProperty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyString; @@ -362,6 +366,10 @@ public class TestPutHive3Streaming { runner.run(); runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + assertThat( + runner.getLogger().getErrorMessages(), + hasItem(hasProperty("msg", containsString("Exception while trying to stream {} to hive - routing to failure"))) + ); } @Test @@ -395,6 +403,10 @@ public class TestPutHive3Streaming { runner.run(); runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + assertThat( + runner.getLogger().getErrorMessages(), + hasItem(hasProperty("msg", containsString("Failed to create {} for {} - routing to failure"))) + ); } @Test @@ -465,6 +477,10 @@ public class TestPutHive3Streaming { runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0); + assertThat( + runner.getLogger().getErrorMessages(), + hasItem(hasProperty("msg", containsString("Exception while processing {} - routing to failure"))) + ); } @Test @@ -577,6 +593,10 @@ public class TestPutHive3Streaming { final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_FAILURE).get(0); assertEquals("0", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + assertThat( + runner.getLogger().getErrorMessages(), + hasItem(hasProperty("msg", containsString("Exception while processing {} - routing to failure"))) + ); } @Test @@ -630,6 +650,10 @@ public class TestPutHive3Streaming { runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + assertThat( + runner.getLogger().getErrorMessages(), + hasItem(hasProperty("msg", containsString("Exception while processing {} - routing to failure"))) + ); } @Test