diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 6319791e9a..a0c986c6b2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -146,9 +146,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - final String index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue(); final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final String docType = context.getProperty(TYPE).evaluateAttributeExpressions().getValue(); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final List flowFiles = session.get(batchSize); @@ -166,6 +164,9 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { } for (FlowFile file : flowFiles) { + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); + final String id = file.getAttribute(id_attribute); if (id == null) { logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file}); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 957332a4f3..fa2767b76a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -41,6 +41,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; @@ -209,6 +211,43 @@ public class TestPutElasticsearch { assertNotNull(out); } + @Test + public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "${i}"); + runner.setProperty(PutElasticsearch.TYPE, "${type}"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652144"); + put("i", "doc"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + runner.clearTransferState(); + + // Now try an empty attribute value, should fail + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652144"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_RETRY, 1); + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearch.REL_RETRY).get(0); + assertNotNull(out2); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -226,7 +265,7 @@ public class TestPutElasticsearch { @Override public void createElasticsearchClient(ProcessContext context) throws ProcessException { - Client mockClient = mock(Client.class); + final Client mockClient = mock(Client.class); BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE)); if (exceptionToThrow != null) { doThrow(exceptionToThrow).when(bulkRequestBuilder).execute(); @@ -235,8 +274,23 @@ public class TestPutElasticsearch { } when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder); - IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE); - when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder); + when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenAnswer(new Answer() { + @Override + public IndexRequestBuilder answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + String arg1 = (String) args[0]; + if (arg1.isEmpty()) { + throw new NoNodeAvailableException("Needs index"); + } + String arg2 = (String) args[1]; + if (arg2.isEmpty()) { + throw new NoNodeAvailableException("Needs doc type"); + } else { + IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE); + return indexRequestBuilder; + } + } + }); esClient.set(mockClient); }