diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 6452bc7f69..507558608e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -100,6 +100,16 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { AttributeExpression.ResultType.STRING, true)) .build(); + public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() + .name("Index Operation") + .description("The type of the operation used to index (index, update, upsert)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .defaultValue("index") + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") .description("The preferred number of FlowFiles to put to the database in a single transaction") @@ -134,6 +144,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { descriptors.add(TYPE); descriptors.add(CHARSET); descriptors.add(BATCH_SIZE); + descriptors.add(INDEX_OP); return Collections.unmodifiableList(descriptors); } @@ -166,6 +177,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { for (FlowFile file : flowFiles) { final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); + final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue(); final String id = file.getAttribute(id_attribute); if (id == null) { @@ -178,8 +190,20 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { public void process(final InputStream in) throws IOException { String json = IOUtils.toString(in, charset) .replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '); - bulk.add(esClient.get().prepareIndex(index, docType, id) - .setSource(json.getBytes(charset))); + + if (indexOp.equalsIgnoreCase("index")) { + bulk.add(esClient.get().prepareIndex(index, docType, id) + .setSource(json.getBytes(charset))); + } else if (indexOp.equalsIgnoreCase("upsert")) { + bulk.add(esClient.get().prepareUpdate(index, docType, id) + .setDoc(json.getBytes(charset)) + .setDocAsUpsert(true)); + } else if (indexOp.equalsIgnoreCase("update")) { + bulk.add(esClient.get().prepareUpdate(index, docType, id) + .setDoc(json.getBytes(charset))); + } else { + throw new IOException("Index operation: " + indexOp + " not supported."); + } } }); } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index fa2767b76a..ce25b8108a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -248,6 +248,36 @@ public class TestPutElasticsearch { assertNotNull(out2); } + @Test + public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(PutElasticsearch.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + runner.setProperty(PutElasticsearch.INDEX_OP, "index_fail"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0); + assertNotNull(out); + } + /** * A Test class that extends the processor in order to inject/mock behavior */