diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 2b39a86b95..479d396630 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -321,9 +321,16 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final Response getResponse; try { getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody); - } catch (IllegalStateException | IOException ioe) { - throw new ProcessException(ioe); + } catch (final Exception e) { + logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); + flowFilesToTransfer.forEach((flowFileToTransfer) -> { + flowFileToTransfer = session.penalize(flowFileToTransfer); + session.transfer(flowFileToTransfer, REL_FAILURE); + }); + flowFilesToTransfer.clear(); + return; } + final int statusCode = getResponse.code(); if (isSuccess(statusCode)) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index fae63eeb62..a8575d4bc2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -37,6 +37,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.net.ConnectException; import java.util.HashMap; import static org.junit.Assert.assertNotNull; @@ -221,6 +222,25 @@ public class TestPutElasticsearchHttp { runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 1); } + @Test + public void testPutElasticSearchOnTriggerWithConnectException() throws IOException { + PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); + processor.setStatus(-1, "Connection Exception"); + runner = TestRunners.newTestRunner(processor); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + } + @Test public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures @@ -328,31 +348,36 @@ public class TestPutElasticsearchHttp { @Override public Call answer(InvocationOnMock invocationOnMock) throws Throwable { - Request realRequest = (Request) invocationOnMock.getArguments()[0]; - StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); - sb.append(responseHasFailures); - sb.append("\", \"items\": ["); - if (responseHasFailures) { - // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside - sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); - sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); - sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); - sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},"); - } - sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); - sb.append(statusCode); - sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); - - sb.append("]}"); - Response mockResponse = new Response.Builder() - .request(realRequest) - .protocol(Protocol.HTTP_1_1) - .code(statusCode) - .message(statusMessage) - .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) - .build(); final Call call = mock(Call.class); - when(call.execute()).thenReturn(mockResponse); + if (statusCode != -1) { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); + sb.append(responseHasFailures); + sb.append("\", \"items\": ["); + if (responseHasFailures) { + // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); + sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); + sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); + sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},"); + } + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); + sb.append(statusCode); + sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); + + sb.append("]}"); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) + .build(); + + when(call.execute()).thenReturn(mockResponse); + } else { + when(call.execute()).thenThrow(ConnectException.class); + } return call; } });