Added provenance reporting

This closes #2650

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
JohannesDaniel 2018-04-22 21:06:33 +02:00 committed by Mike Thomsen
parent 54eb6bc232
commit 8b6539e3c7

View File

@ -33,6 +33,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -61,6 +62,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.solr.client.solrj.SolrQuery;
@ -336,6 +338,8 @@ public class GetSolr extends SolrProcessor {
solrQuery.setParam("sort", sortClause.toString());
while (continuePaging.get()) {
StopWatch timer = new StopWatch(true);
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
@ -385,8 +389,19 @@ public class GetSolr extends SolrProcessor {
}
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
}
timer.stop();
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (getSolrLocation().equals(SolrUtils.SOLR_TYPE_CLOUD.getValue())) {
transitUri.append(":").append(context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
session.transfer(flowFile, REL_SUCCESS);
}
continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue()));