NIFI-3087: This closes #1263. Added unit tests to PutElasticsearch(Http) to illustrate issue

This commit is contained in:
Matt Burgess 2016-11-22 20:07:31 -05:00 committed by joewitt
parent d3dbac50a8
commit 7fc7494b21
2 changed files with 25 additions and 13 deletions

View File

@ -112,15 +112,19 @@ public class TestPutElasticsearch {
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.INDEX, "doc");
runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "2");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{ runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140"); put("doc_id", "28039652140");
}}); }});
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652141");
}});
runner.run(1, true, true); runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1); runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
assertNotNull(out); assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140"); out.assertAttributeEquals("doc_id", "28039652140");
@ -349,10 +353,16 @@ public class TestPutElasticsearch {
public BulkResponse get() throws InterruptedException, ExecutionException { public BulkResponse get() throws InterruptedException, ExecutionException {
BulkResponse response = mock(BulkResponse.class); BulkResponse response = mock(BulkResponse.class);
when(response.hasFailures()).thenReturn(responseHasFailures); when(response.hasFailures()).thenReturn(responseHasFailures);
BulkItemResponse item = mock(BulkItemResponse.class); BulkItemResponse item1 = mock(BulkItemResponse.class);
when(item.getItemId()).thenReturn(1); BulkItemResponse item2 = mock(BulkItemResponse.class);
when(item.isFailed()).thenReturn(true); when(item1.getItemId()).thenReturn(1);
when(response.getItems()).thenReturn(new BulkItemResponse[]{item}); when(item1.isFailed()).thenReturn(true);
BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
when(failure.getMessage()).thenReturn("Bad message");
when(item1.getFailure()).thenReturn(failure);
when(item2.getItemId()).thenReturn(2);
when(item2.isFailed()).thenReturn(false);
when(response.getItems()).thenReturn(new BulkItemResponse[]{item1, item2});
return response; return response;
} }

View File

@ -201,13 +201,15 @@ public class TestPutElasticsearchHttp {
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status"); runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "2");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample);
runner.enqueue(docExample); runner.enqueue(docExample);
runner.run(1, true, true); runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); runner.assertTransferCount(PutElasticsearchHttp.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
assertNotNull(out); assertNotNull(out);
} }
@ -308,12 +310,12 @@ public class TestPutElasticsearchHttp {
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}}"); sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
} else {
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
sb.append(statusCode);
sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
} }
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
sb.append(statusCode);
sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
sb.append("]}"); sb.append("]}");
Response mockResponse = new Response.Builder() Response mockResponse = new Response.Builder()
.request(realRequest) .request(realRequest)