NIFI-4972 - SelectHiveQL to emit FETCH provenance event

SelectHiveQL should emit FETCH instead of CONTENT_MODIFIED when it has
incoming connections.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2543.
This commit is contained in:
Koji Kawamura 2018-03-14 17:42:10 +09:00 committed by Pierre Villard
parent b0e5e1644c
commit d3f54994a6
3 changed files with 21 additions and 5 deletions

View File

@ -392,7 +392,7 @@ Processor 3</pre>
</td>
<td>
SEND<br/>
RECEIVE<br/>
RECEIVE, FETCH<br/>
</td>
<td>jdbc:hive2://hive.example.com:10000/default</td>
<td>hive_table</td>

View File

@ -410,10 +410,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
new Object[]{flowfile, nrOfRows.get()});
if (context.hasIncomingConnection()) {
// If the flow file came from an incoming connection, issue a Modify Content provenance event
session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
// If the flow file came from an incoming connection, issue a Fetch provenance event
session.getProvenanceReporter().fetch(flowfile, dbcpService.getConnectionURL(),
"Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} else {
// If we created a flow file from rows received from Hive, issue a Receive provenance event
session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));

View File

@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -118,11 +120,26 @@ public class TestSelectHiveQL {
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
runner.setIncomingConnection(false);
invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType());
assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri());
}
@Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FETCH, provenance0.getEventType());
assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri());
final ProvenanceEventRecord provenance1 = provenanceEvents.get(1);
assertEquals(ProvenanceEventType.FORK, provenance1.getEventType());
}