NIFI-4410: Improved error handling/logging in PutElasticsearchHttp processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2175.
This commit is contained in:
Matthew Burgess 2017-09-25 22:25:29 -04:00 committed by Pierre Villard
parent e74c67f779
commit 4df3eb567d
3 changed files with 70 additions and 17 deletions

View File

@ -355,15 +355,27 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if (itemNodeArray.size() > 0) { if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array // All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, moving each to success or failure accordingly // at the same time as the flow file list, moving each to success or failure accordingly,
// but only keep the first error for logging
String errorReason = null;
for (int i = itemNodeArray.size() - 1; i >= 0; i--) { for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i); JsonNode itemNode = itemNodeArray.get(i);
if (flowFilesToTransfer.size() > i) {
FlowFile flowFile = flowFilesToTransfer.remove(i); FlowFile flowFile = flowFilesToTransfer.remove(i);
int status = itemNode.findPath("status").asInt(); int status = itemNode.findPath("status").asInt();
if (!isSuccess(status)) { if (!isSuccess(status)) {
String reason = itemNode.findPath("//error/reason").asText(); if (errorReason == null) {
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
new Object[]{flowFile, reason}); String reason = itemNode.findPath("//result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("//error/reason").asText();
}
errorReason = reason;
logger.error("Failed to process {} due to {}, transferring to failure",
new Object[]{flowFile, errorReason});
}
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} else { } else {
@ -374,6 +386,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
} }
} }
} }
}
// Transfer any remaining flowfiles to success // Transfer any remaining flowfiles to success
flowFilesToTransfer.forEach(file -> { flowFilesToTransfer.forEach(file -> {
session.transfer(file, REL_SUCCESS); session.transfer(file, REL_SUCCESS);

View File

@ -394,14 +394,24 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if (itemNodeArray.size() > 0) { if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array // All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, logging failures accordingly // at the same time as the flow file list, moving each to success or failure accordingly,
// but only keep the first error for logging
String errorReason = null;
for (int i = itemNodeArray.size() - 1; i >= 0; i--) { for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i); JsonNode itemNode = itemNodeArray.get(i);
int status = itemNode.findPath("status").asInt(); int status = itemNode.findPath("status").asInt();
if (!isSuccess(status)) { if (!isSuccess(status)) {
String reason = itemNode.findPath("//error/reason").asText(); if (errorReason == null) {
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
new Object[]{flowFile, reason}); String reason = itemNode.findPath("//result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("//error/reason").asText();
}
errorReason = reason;
logger.error("Failed to process {} due to {}, transferring to failure",
new Object[]{flowFile, errorReason});
}
} }
} }
} }

View File

@ -350,6 +350,27 @@ public class TestPutElasticsearchHttp {
assertNotNull(out); assertNotNull(out);
} }
@Test
public void testPutElasticSearchOnTriggerWithDocumentNotFound() throws IOException {
PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true);
processor.setResultField("not_found");
runner = TestRunners.newTestRunner(processor); // simulate failures
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttp.INDEX_OP, "delete");
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1);
runner.clearTransferState();
}
/** /**
* A Test class that extends the processor in order to inject/mock behavior * A Test class that extends the processor in order to inject/mock behavior
*/ */
@ -359,6 +380,7 @@ public class TestPutElasticsearchHttp {
int statusCode = 200; int statusCode = 200;
String statusMessage = "OK"; String statusMessage = "OK";
String expectedUrl = null; String expectedUrl = null;
String resultField = null;
PutElasticsearchTestProcessor(boolean responseHasFailures) { PutElasticsearchTestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures; this.responseHasFailures = responseHasFailures;
@ -373,6 +395,10 @@ public class TestPutElasticsearchHttp {
expectedUrl = url; expectedUrl = url;
} }
public void setResultField(String resultField) {
this.resultField = resultField;
}
@Override @Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException { protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class); client = mock(OkHttpClient.class);
@ -391,7 +417,11 @@ public class TestPutElasticsearchHttp {
if (responseHasFailures) { if (responseHasFailures) {
// This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
if(resultField != null) {
sb.append("\"result\":{\"not_found\",");
} else {
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
}
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},"); sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
} }