This commit is contained in:
Mark Payne 2016-03-11 12:55:23 -05:00
commit a85c119533
2 changed files with 44 additions and 18 deletions

View File

@ -49,6 +49,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -156,18 +157,21 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
} }
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
// Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
try { try {
final BulkRequestBuilder bulk = esClient.get().prepareBulk(); final BulkRequestBuilder bulk = esClient.get().prepareBulk();
if (authToken != null) { if (authToken != null) {
bulk.putHeader("Authorization", authToken); bulk.putHeader("Authorization", authToken);
} }
for (FlowFile file : flowFiles) { for (FlowFile file : flowFiles) {
final String id = file.getAttribute(id_attribute); final String id = file.getAttribute(id_attribute);
if (id == null) { if (id == null) {
logger.error("No value in identifier attribute {} for {}", new Object[]{id_attribute, file}); logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file});
flowFilesToTransfer.remove(file);
session.transfer(file, REL_FAILURE); session.transfer(file, REL_FAILURE);
} } else {
session.read(file, new InputStreamCallback() { session.read(file, new InputStreamCallback() {
@Override @Override
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
@ -178,24 +182,26 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
} }
}); });
} }
}
final BulkResponse response = bulk.execute().actionGet(); final BulkResponse response = bulk.execute().actionGet();
if (response.hasFailures()) { if (response.hasFailures()) {
for (final BulkItemResponse item : response.getItems()) { for (final BulkItemResponse item : response.getItems()) {
final FlowFile flowFile = flowFiles.get(item.getItemId()); final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId());
if (item.isFailed()) { if (item.isFailed()) {
logger.error("Failed to insert {} into Elasticsearch due to {}", logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
new Object[]{flowFile, item.getFailure().getMessage()}); new Object[]{flowFile, item.getFailure().getMessage()});
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} else { } else {
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
flowFilesToTransfer.remove(flowFile);
} }
} else {
session.transfer(flowFiles, REL_SUCCESS);
} }
// Transfer any remaining flowfiles to success
session.transfer(flowFilesToTransfer, REL_SUCCESS);
} catch (NoNodeAvailableException } catch (NoNodeAvailableException
| ElasticsearchTimeoutException | ElasticsearchTimeoutException
@ -209,14 +215,14 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " + logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " +
"the NiFi logs.", "the NiFi logs.",
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry); new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
session.transfer(flowFiles, REL_RETRY); session.transfer(flowFilesToTransfer, REL_RETRY);
context.yield(); context.yield();
} catch (Exception exceptionToFail) { } catch (Exception exceptionToFail) {
logger.error("Failed to insert into Elasticsearch due to {}", logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure",
new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail); new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail);
session.transfer(flowFiles, REL_FAILURE); session.transfer(flowFilesToTransfer, REL_FAILURE);
context.yield(); context.yield();
} }
} }

View File

@ -188,6 +188,26 @@ public class TestPutElasticsearch {
runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1); runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
} }
@Test
public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "doc");
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
assertNotNull(out);
}
/** /**
* A Test class that extends the processor in order to inject/mock behavior * A Test class that extends the processor in order to inject/mock behavior