From fa5fed9bb59eb485e48dd7350bf693a3039307ef Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 22 Nov 2016 12:53:45 -0500 Subject: [PATCH] NIFI-3082: Fixed status code handling in PutElasticsearchHttp This closes #1258. --- .../elasticsearch/PutElasticsearchHttp.java | 12 +++++++++--- .../elasticsearch/TestPutElasticsearchHttp.java | 11 ++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 92b1452600..71171006c0 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -358,9 +358,15 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { session.transfer(flowFilesToTransfer, REL_FAILURE); context.yield(); } - } else { - // Something went wrong during the bulk update, throw a ProcessException to indicate rollback - throw new ProcessException("Received error code " + statusCode + " from Elasticsearch API"); + } else if (statusCode / 100 == 5) { + // 5xx -> RETRY, but a server error might last a while, so yield + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", + new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFilesToTransfer, REL_RETRY); + context.yield(); + } else { // 1xx, 3xx, 4xx, etc. -> NO RETRY + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFilesToTransfer, REL_FAILURE); } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index c3d5a3497c..1172004bb7 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -167,7 +167,7 @@ public class TestPutElasticsearchHttp { runner.assertNotValid(); } - @Test(expected = AssertionError.class) + @Test public void testPutElasticSearchOnTriggerWithFailures() throws IOException { PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); processor.setStatus(100, "Should fail"); @@ -183,6 +183,15 @@ public class TestPutElasticsearchHttp { put("doc_id", "28039652140"); }}); runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + runner.clearTransferState(); + + processor.setStatus(500, "Should retry"); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 1); } @Test