NIFI-3194: Fixed error handling in PutElasticsearchHttp

Thise closes #1327

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Matt Burgess 2016-12-13 16:26:32 -05:00 committed by jpercivall
parent aef17f9a8b
commit 3b91635398
2 changed files with 58 additions and 26 deletions

View File

@ -321,9 +321,16 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final Response getResponse; final Response getResponse;
try { try {
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody); getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
} catch (IllegalStateException | IOException ioe) { } catch (final Exception e) {
throw new ProcessException(ioe); logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
flowFilesToTransfer.forEach((flowFileToTransfer) -> {
flowFileToTransfer = session.penalize(flowFileToTransfer);
session.transfer(flowFileToTransfer, REL_FAILURE);
});
flowFilesToTransfer.clear();
return;
} }
final int statusCode = getResponse.code(); final int statusCode = getResponse.code();
if (isSuccess(statusCode)) { if (isSuccess(statusCode)) {

View File

@ -37,6 +37,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap; import java.util.HashMap;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -221,6 +222,25 @@ public class TestPutElasticsearchHttp {
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 1); runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 1);
} }
@Test
public void testPutElasticSearchOnTriggerWithConnectException() throws IOException {
PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true);
processor.setStatus(-1, "Connection Exception");
runner = TestRunners.newTestRunner(processor); // simulate failures
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1);
}
@Test @Test
public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
@ -328,31 +348,36 @@ public class TestPutElasticsearchHttp {
@Override @Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable { public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
sb.append(responseHasFailures);
sb.append("\", \"items\": [");
if (responseHasFailures) {
// This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
}
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
sb.append(statusCode);
sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
sb.append("]}");
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
.build();
final Call call = mock(Call.class); final Call call = mock(Call.class);
when(call.execute()).thenReturn(mockResponse); if (statusCode != -1) {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
sb.append(responseHasFailures);
sb.append("\", \"items\": [");
if (responseHasFailures) {
// This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
}
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
sb.append(statusCode);
sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
sb.append("]}");
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
.build();
when(call.execute()).thenReturn(mockResponse);
} else {
when(call.execute()).thenThrow(ConnectException.class);
}
return call; return call;
} }
}); });