NIFI-3087: Fixed issue with partial failure responses in PutElasticsearch(Http)

This commit is contained in:
Matt Burgess 2016-11-22 20:08:17 -05:00 committed by joewitt
parent 066accc274
commit d3dbac50a8
2 changed files with 14 additions and 10 deletions

View File

@ -211,17 +211,21 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
final BulkResponse response = bulk.execute().actionGet(); final BulkResponse response = bulk.execute().actionGet();
if (response.hasFailures()) { if (response.hasFailures()) {
for (final BulkItemResponse item : response.getItems()) { // Responses are guaranteed to be in order, remove them in reverse order
final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId()); BulkItemResponse[] responses = response.getItems();
if (item.isFailed()) { if (responses != null && responses.length > 0) {
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", for (int i = responses.length - 1; i >= 0; i--) {
new Object[]{flowFile, item.getFailure().getMessage()}); final FlowFile flowFile = flowFilesToTransfer.get(i);
session.transfer(flowFile, REL_FAILURE); if (responses[i].isFailed()) {
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
new Object[]{flowFile, responses[i].getFailure().getMessage()});
session.transfer(flowFile, REL_FAILURE);
} else { } else {
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
}
flowFilesToTransfer.remove(flowFile);
} }
flowFilesToTransfer.remove(flowFile);
} }
} }

View File

@ -328,7 +328,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
if (itemNodeArray.size() > 0) { if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array // All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, moving each to success or failure accordingly // at the same time as the flow file list, moving each to success or failure accordingly
for (int i = 0; i < itemNodeArray.size(); i++) { for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i); JsonNode itemNode = itemNodeArray.get(i);
FlowFile flowFile = flowFilesToTransfer.remove(i); FlowFile flowFile = flowFilesToTransfer.remove(i);
int status = itemNode.findPath("status").asInt(); int status = itemNode.findPath("status").asInt();