NIFI-3057 Added provenance events to PutElasticsearch and FetchElasticsearch

This closes: #1370.

Signed-off-by: Andre F de Miranda <trixpan@users.noreply.github.com>
This commit is contained in:
Pierre Villard 2016-12-29 17:42:24 +01:00 committed by Andre F de Miranda
parent 2ef7c15b5d
commit 1e43694145
4 changed files with 26 additions and 1 deletions

View File

@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ -143,6 +144,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
return propertyDescriptors; return propertyDescriptors;
} }
@Override
@OnScheduled @OnScheduled
public void setup(ProcessContext context) { public void setup(ProcessContext context) {
super.setup(context); super.setup(context);
@ -165,6 +167,8 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
try { try {
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
final long startNanos = System.nanoTime();
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId); GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
if (authToken != null) { if (authToken != null) {
getRequestBuilder.putHeader("Authorization", authToken); getRequestBuilder.putHeader("Authorization", authToken);
@ -189,6 +193,10 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
} }
}); });
logger.debug("Elasticsearch document " + docId + " fetched, routing to success"); logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
session.getProvenanceReporter().fetch(flowFile, uri, millis);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
} catch (NoNodeAvailableException } catch (NoNodeAvailableException
@ -211,6 +219,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
/** /**
* Dispose of ElasticSearch client * Dispose of ElasticSearch client
*/ */
@Override
@OnStopped @OnStopped
public void closeClient() { public void closeClient() {
super.closeClient(); super.closeClient();

View File

@ -230,6 +230,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} else { } else {
session.getProvenanceReporter().send(flowFile, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + responses[i].getIndex());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
flowFilesToTransfer.remove(flowFile); flowFilesToTransfer.remove(flowFile);
@ -238,7 +239,12 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
} }
// Transfer any remaining flowfiles to success // Transfer any remaining flowfiles to success
session.transfer(flowFilesToTransfer, REL_SUCCESS); flowFilesToTransfer.forEach(file -> {
session.transfer(file, REL_SUCCESS);
// Record provenance event
session.getProvenanceReporter().send(file, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" +
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue());
});
} catch (NoNodeAvailableException } catch (NoNodeAvailableException
| ElasticsearchTimeoutException | ElasticsearchTimeoutException

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessContext;
@ -49,6 +50,8 @@ import java.net.MalformedURLException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
@ -97,6 +100,8 @@ public class TestFetchElasticsearch {
runner.run(1, true, true); runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
assertFalse(runner.getProvenanceEvents().isEmpty());
runner.getProvenanceEvents().forEach(event -> { assertEquals(event.getEventType(), ProvenanceEventType.FETCH); });
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out); assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140"); out.assertAttributeEquals("doc_id", "28039652140");

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -50,6 +51,8 @@ import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -97,6 +100,8 @@ public class TestPutElasticsearch {
runner.run(1, true, true); runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
assertFalse(runner.getProvenanceEvents().isEmpty());
runner.getProvenanceEvents().forEach(event -> { assertEquals(event.getEventType(), ProvenanceEventType.SEND); });
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out); assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140"); out.assertAttributeEquals("doc_id", "28039652140");