mirror of https://github.com/apache/nifi.git
NIFI-9764: Atlas reporting task sends 'unknown' hive_table when table is name not available
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #5839.
This commit is contained in:
parent
df00cc6cb5
commit
92202a5b95
|
@ -26,6 +26,7 @@ import org.apache.nifi.util.Tuple;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -61,6 +62,8 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
|
|||
private static final String URI_PATTERN_STR = "jdbc:hive2://([^/]+)/?(.*)$";
|
||||
private static final Pattern URI_PATTERN = Pattern.compile(URI_PATTERN_STR);
|
||||
|
||||
private static final String UNKNOWN_TABLE = "unknown";
|
||||
|
||||
@Override
|
||||
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
|
||||
|
||||
|
@ -72,7 +75,7 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
|
|||
|
||||
final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
|
||||
if (!uriMatcher.matches()) {
|
||||
logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
|
||||
logger.warn("Unexpected transit URI: {}", transitUri);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -89,13 +92,20 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
|
|||
connectedDatabaseName = "default";
|
||||
}
|
||||
|
||||
final Set<Tuple<String, String>> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
|
||||
final Set<Tuple<String, String>> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
|
||||
Set<Tuple<String, String>> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
|
||||
Set<Tuple<String, String>> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
|
||||
|
||||
if (inputTables.isEmpty() && outputTables.isEmpty()) {
|
||||
// If input/output tables are unknown, create database level lineage.
|
||||
// If input/output tables are unknown, create hive_table entity with name 'unknown' (hive_db is not a DataSet entity and therefore it cannot be used in the lineage).
|
||||
// Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
|
||||
return getDatabaseRef(event.getComponentId(), event.getEventType(), namespace, connectedDatabaseName.toLowerCase());
|
||||
final ProvenanceEventType eventType = event.getEventType();
|
||||
if (eventType == ProvenanceEventType.RECEIVE || eventType == ProvenanceEventType.FETCH) {
|
||||
logger.warn("Input table name is missing, defaults to '{}'. Transit URI: {}", UNKNOWN_TABLE, transitUri);
|
||||
inputTables = Collections.singleton(new Tuple<>(connectedDatabaseName.toLowerCase(), UNKNOWN_TABLE));
|
||||
} else if (eventType == ProvenanceEventType.SEND) {
|
||||
logger.warn("Output table name is missing, defaults to '{}'. Transit URI: {}", UNKNOWN_TABLE, transitUri);
|
||||
outputTables = Collections.singleton(new Tuple<>(connectedDatabaseName.toLowerCase(), UNKNOWN_TABLE));
|
||||
}
|
||||
}
|
||||
|
||||
final DataSetRefs refs = new DataSetRefs(event.getComponentId());
|
||||
|
@ -104,13 +114,6 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
|
|||
return refs;
|
||||
}
|
||||
|
||||
private DataSetRefs getDatabaseRef(String componentId, ProvenanceEventType eventType,
|
||||
String namespace, String databaseName) {
|
||||
final Referenceable ref = createDatabaseRef(namespace, databaseName);
|
||||
|
||||
return singleDataSetRef(componentId, eventType, ref);
|
||||
}
|
||||
|
||||
private void addRefs(DataSetRefs refs, boolean isInput, String namespace,
|
||||
Set<Tuple<String, String>> tableNames) {
|
||||
tableNames.forEach(tableName -> {
|
||||
|
|
|
@ -45,10 +45,11 @@ public class TestHive2JDBC {
|
|||
|
||||
/**
|
||||
* If a provenance event does not have table name attributes,
|
||||
* then a database lineage should be created.
|
||||
* then a table lineage is created with table name 'unknown'.
|
||||
* Database lineage cannot be sent to Atlas because hive_db is not a DataSet entity.
|
||||
*/
|
||||
@Test
|
||||
public void testDatabaseLineage() {
|
||||
public void testUnknownTableLineage() {
|
||||
final String processorName = "PutHiveQL";
|
||||
final String transitUri = "jdbc:hive2://0.example.com:10000/database_A";
|
||||
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
|
||||
|
@ -69,9 +70,9 @@ public class TestHive2JDBC {
|
|||
assertEquals(0, refs.getInputs().size());
|
||||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_db", ref.getTypeName());
|
||||
assertEquals("database_a", ref.get(ATTR_NAME));
|
||||
assertEquals("database_a@namespace1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("unknown", ref.get(ATTR_NAME));
|
||||
assertEquals("database_a.unknown@namespace1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue