NIFI-3082: Fixed status code handling in PutElasticsearchHttp

This closes #1258.
This commit is contained in:
Matt Burgess 2016-11-22 12:53:45 -05:00 committed by Pierre Villard
parent 9e884f6120
commit fa5fed9bb5
2 changed files with 19 additions and 4 deletions

View File

@ -358,9 +358,15 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
session.transfer(flowFilesToTransfer, REL_FAILURE); session.transfer(flowFilesToTransfer, REL_FAILURE);
context.yield(); context.yield();
} }
} else { } else if (statusCode / 100 == 5) {
// Something went wrong during the bulk update, throw a ProcessException to indicate rollback // 5xx -> RETRY, but a server error might last a while, so yield
throw new ProcessException("Received error code " + statusCode + " from Elasticsearch API"); logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
new Object[]{statusCode, getResponse.message()});
session.transfer(flowFilesToTransfer, REL_RETRY);
context.yield();
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
session.transfer(flowFilesToTransfer, REL_FAILURE);
} }
} }
} }

View File

@ -167,7 +167,7 @@ public class TestPutElasticsearchHttp {
runner.assertNotValid(); runner.assertNotValid();
} }
@Test(expected = AssertionError.class) @Test
public void testPutElasticSearchOnTriggerWithFailures() throws IOException { public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true);
processor.setStatus(100, "Should fail"); processor.setStatus(100, "Should fail");
@ -183,6 +183,15 @@ public class TestPutElasticsearchHttp {
put("doc_id", "28039652140"); put("doc_id", "28039652140");
}}); }});
runner.run(1, true, true); runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1);
runner.clearTransferState();
processor.setStatus(500, "Should retry");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 1);
} }
@Test @Test