diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java index 643edbb6ef..84f31b7704 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -143,6 +144,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc return propertyDescriptors; } + @Override @OnScheduled public void setup(ProcessContext context) { super.setup(context); @@ -165,6 +167,8 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc try { logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); + final long startNanos = System.nanoTime(); + GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId); if (authToken != null) { getRequestBuilder.putHeader("Authorization", authToken); @@ -189,6 +193,10 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc } }); logger.debug("Elasticsearch document " + docId + " fetched, routing to success"); + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId; + session.getProvenanceReporter().fetch(flowFile, uri, millis); session.transfer(flowFile, REL_SUCCESS); } } catch (NoNodeAvailableException @@ -211,6 +219,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc /** * Dispose of ElasticSearch client */ + @Override @OnStopped public void closeClient() { super.closeClient(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index ab61f674bc..1f68c22678 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -230,6 +230,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces session.transfer(flowFile, REL_FAILURE); } else { + session.getProvenanceReporter().send(flowFile, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + responses[i].getIndex()); session.transfer(flowFile, REL_SUCCESS); } flowFilesToTransfer.remove(flowFile); @@ -238,7 +239,12 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces } // Transfer any remaining flowfiles to success - session.transfer(flowFilesToTransfer, REL_SUCCESS); + flowFilesToTransfer.forEach(file -> { + session.transfer(file, REL_SUCCESS); + // Record provenance event + session.getProvenanceReporter().send(file, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + + context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue()); + }); } catch (NoNodeAvailableException | ElasticsearchTimeoutException diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java index ba22b65c92..30c42d0148 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; @@ -49,6 +50,8 @@ import java.net.MalformedURLException; import java.util.HashMap; import java.util.concurrent.ExecutionException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; @@ -97,6 +100,8 @@ public class TestFetchElasticsearch { runner.run(1, true, true); runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1); + assertFalse(runner.getProvenanceEvents().isEmpty()); + runner.getProvenanceEvents().forEach(event -> { assertEquals(event.getEventType(), ProvenanceEventType.FETCH); }); final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0); assertNotNull(out); out.assertAttributeEquals("doc_id", "28039652140"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 6d6da5a01f..6da6a2971c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -50,6 +51,8 @@ import java.io.InputStream; import java.util.HashMap; import java.util.concurrent.ExecutionException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -97,6 +100,8 @@ public class TestPutElasticsearch { runner.run(1, true, true); runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + assertFalse(runner.getProvenanceEvents().isEmpty()); + runner.getProvenanceEvents().forEach(event -> { assertEquals(event.getEventType(), ProvenanceEventType.SEND); }); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); assertNotNull(out); out.assertAttributeEquals("doc_id", "28039652140");