mirror of https://github.com/apache/nifi.git
NIFI-1619: Fix Elasticsearch processor bug when flow file missing ID attribute
This closes #269 Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
f6705f234c
commit
e1cf37fb89
|
@ -49,6 +49,7 @@ import java.nio.charset.Charset;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -156,18 +157,21 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
}
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
|
||||
// Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
|
||||
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
|
||||
try {
|
||||
final BulkRequestBuilder bulk = esClient.get().prepareBulk();
|
||||
if (authToken != null) {
|
||||
bulk.putHeader("Authorization", authToken);
|
||||
}
|
||||
|
||||
for (FlowFile file : flowFiles) {
|
||||
final String id = file.getAttribute(id_attribute);
|
||||
if (id == null) {
|
||||
logger.error("No value in identifier attribute {} for {}", new Object[]{id_attribute, file});
|
||||
logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file});
|
||||
flowFilesToTransfer.remove(file);
|
||||
session.transfer(file, REL_FAILURE);
|
||||
}
|
||||
} else {
|
||||
session.read(file, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
|
@ -178,24 +182,26 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
final BulkResponse response = bulk.execute().actionGet();
|
||||
if (response.hasFailures()) {
|
||||
for (final BulkItemResponse item : response.getItems()) {
|
||||
final FlowFile flowFile = flowFiles.get(item.getItemId());
|
||||
final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId());
|
||||
if (item.isFailed()) {
|
||||
logger.error("Failed to insert {} into Elasticsearch due to {}",
|
||||
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
|
||||
new Object[]{flowFile, item.getFailure().getMessage()});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
||||
} else {
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
flowFilesToTransfer.remove(flowFile);
|
||||
}
|
||||
} else {
|
||||
session.transfer(flowFiles, REL_SUCCESS);
|
||||
}
|
||||
|
||||
// Transfer any remaining flowfiles to success
|
||||
session.transfer(flowFilesToTransfer, REL_SUCCESS);
|
||||
|
||||
} catch (NoNodeAvailableException
|
||||
| ElasticsearchTimeoutException
|
||||
|
@ -209,14 +215,14 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " +
|
||||
"the NiFi logs.",
|
||||
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
|
||||
session.transfer(flowFiles, REL_RETRY);
|
||||
session.transfer(flowFilesToTransfer, REL_RETRY);
|
||||
context.yield();
|
||||
|
||||
} catch (Exception exceptionToFail) {
|
||||
logger.error("Failed to insert into Elasticsearch due to {}",
|
||||
logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure",
|
||||
new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail);
|
||||
|
||||
session.transfer(flowFiles, REL_FAILURE);
|
||||
session.transfer(flowFilesToTransfer, REL_FAILURE);
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -188,6 +188,26 @@ public class TestPutElasticsearch {
|
|||
runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
|
||||
assertNotNull(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* A Test class that extends the processor in order to inject/mock behavior
|
||||
|
|
Loading…
Reference in New Issue