NIFI-6534 Improve logging in PutHive3Streaming processor

Added logging for PutHive3Streaming when routing to failure

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3640
This commit is contained in:
Tamas Palfy 2019-08-09 10:38:24 +02:00 committed by Matthew Burgess
parent 35d79eaf0f
commit 2baafcc2e6
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 44 additions and 0 deletions

View File

@ -135,5 +135,10 @@
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -431,6 +431,11 @@ public class PutHive3Streaming extends AbstractProcessor {
if (rollbackOnFailure) {
throw new ProcessException(rrfe);
} else {
log.error(
"Failed to create {} for {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile},
rrfe
);
session.transfer(flowFile, REL_FAILURE);
}
}
@ -445,6 +450,11 @@ public class PutHive3Streaming extends AbstractProcessor {
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error(
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
e
);
session.transfer(flowFile, REL_FAILURE);
}
} catch (DiscontinuedException e) {
@ -479,6 +489,11 @@ public class PutHive3Streaming extends AbstractProcessor {
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error(
"Exception while trying to stream {} to hive - routing to failure",
new Object[]{flowFile},
se
);
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -97,10 +97,14 @@ import java.util.function.BiFunction;
import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
import static org.apache.nifi.processors.hive.PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
import static org.apache.nifi.processors.hive.PutHive3Streaming.KERBEROS_CREDENTIALS_SERVICE;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
@ -362,6 +366,10 @@ public class TestPutHive3Streaming {
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
assertThat(
runner.getLogger().getErrorMessages(),
hasItem(hasProperty("msg", containsString("Exception while trying to stream {} to hive - routing to failure")))
);
}
@Test
@ -395,6 +403,10 @@ public class TestPutHive3Streaming {
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
assertThat(
runner.getLogger().getErrorMessages(),
hasItem(hasProperty("msg", containsString("Failed to create {} for {} - routing to failure")))
);
}
@Test
@ -465,6 +477,10 @@ public class TestPutHive3Streaming {
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0);
assertThat(
runner.getLogger().getErrorMessages(),
hasItem(hasProperty("msg", containsString("Exception while processing {} - routing to failure")))
);
}
@Test
@ -577,6 +593,10 @@ public class TestPutHive3Streaming {
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_FAILURE).get(0);
assertEquals("0", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
assertThat(
runner.getLogger().getErrorMessages(),
hasItem(hasProperty("msg", containsString("Exception while processing {} - routing to failure")))
);
}
@Test
@ -630,6 +650,10 @@ public class TestPutHive3Streaming {
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
assertThat(
runner.getLogger().getErrorMessages(),
hasItem(hasProperty("msg", containsString("Exception while processing {} - routing to failure")))
);
}
@Test