name=tableName (example: myTable)
*/
public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
@@ -57,11 +57,11 @@ public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
final Referenceable ref = new Referenceable(TYPE);
final String[] hostNames = splitHostNames(uriMatcher.group(1));
- final String clusterName = context.getClusterResolver().fromHostNames(hostNames);
+ final String namespace = context.getNamespaceResolver().fromHostNames(hostNames);
final String tableName = uriMatcher.group(2);
ref.set(ATTR_NAME, tableName);
- ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, tableName));
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, tableName));
// TODO: 'uri' is a mandatory attribute, but what should we set?
ref.set(ATTR_URI, transitUri);
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
index 5cf77aba39..6b5cf0b866 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
@@ -32,7 +32,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
/**
* Analyze a transit URI as a HDFS path.
- *
name=/path/fileName (example: /app/warehouse/hive/db/default)
*/
public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
@@ -43,12 +43,14 @@ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
final Referenceable ref = new Referenceable(TYPE);
final URI uri = parseUri(event.getTransitUri());
- final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
+ final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
final String path = uri.getPath();
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
- ref.set(ATTR_CLUSTER_NAME, clusterName);
- ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path));
+ // The attribute 'clusterName' is in the 'hdfs_path' Atlas entity so it cannot be changed.
+ // Using 'namespace' as value for lack of better solution.
+ ref.set(ATTR_CLUSTER_NAME, namespace);
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
index 7821f866df..51babe8d46 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
@@ -39,13 +39,13 @@ import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.par
*
*
If a Provenance event has 'query.input.tables' or 'query.output.tables' attributes then 'hive_table' DataSet reference is created:
*
name=tableName (example: myTable)
*/
public class PutHiveStreaming extends AbstractHiveAnalyzer {
@@ -42,7 +42,7 @@ public class PutHiveStreaming extends AbstractHiveAnalyzer {
}
final URI uri = parseUri(event.getTransitUri());
- final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
+ final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
final Set> outputTables = parseTableNames(null, event.getAttribute(ATTR_OUTPUT_TABLES));
if (outputTables.isEmpty()) {
return null;
@@ -50,7 +50,7 @@ public class PutHiveStreaming extends AbstractHiveAnalyzer {
final DataSetRefs refs = new DataSetRefs(event.getComponentId());
outputTables.forEach(tableName -> {
- final Referenceable ref = createTableRef(clusterName, tableName);
+ final Referenceable ref = createTableRef(namespace, tableName);
refs.addOutput(ref);
});
return refs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java
index 9e6726f153..4b2dbb4062 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Create.java
@@ -26,7 +26,7 @@ import java.util.List;
/**
* Analyze a CREATE event and create 'nifi_data' when there is no specific Analyzer implementation found.
- *
name=NiFiComponentType (example: GenerateFlowFile)
*/
public class Create extends UnknownInput {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java
index 3fd9d708ea..0debe7f6ae 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Fetch.java
@@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a FETCH event and create 'nifi_data' when there is no specific Analyzer implementation found.
- *
name=NiFiComponentType (example: FetchXXX)
*/
public class Fetch extends UnknownInput {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java
index 9ee2dedf64..c4f2318425 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Receive.java
@@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a RECEIVE event and create 'nifi_data' when there is no specific Analyzer implementation found.
- *
name=NiFiComponentType (example: GetXXX)
*/
public class Receive extends UnknownInput {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java
index 4d7d2052e6..202710eac4 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/RemoteInvocation.java
@@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a REMOTE_INVOCATION event and create 'nifi_data' when there is no specific Analyzer implementation found.
- *
name=NiFiComponentType (example: XXX)
*/
public class RemoteInvocation extends UnknownOutput {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java
index 81b4d6fd01..4396318094 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/Send.java
@@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a SEND event and create 'nifi_data' when there is no specific Analyzer implementation found.
- *
name=NiFiComponentType (example: PutXXX)
*/
public class Send extends UnknownOutput {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
index 0117b195ab..c97ac20aaa 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
@@ -33,7 +33,7 @@ public abstract class UnknownDataSet extends AbstractNiFiProvenanceEventAnalyzer
protected Referenceable createDataSetRef(AnalysisContext context, ProvenanceEventRecord event) {
final Referenceable ref = new Referenceable(TYPE);
ref.set(ATTR_NAME, event.getComponentType());
- ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(context.getNiFiClusterName(), event.getComponentId()));
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(context.getNiFiNamespace(), event.getComponentId()));
ref.set(ATTR_DESCRIPTION, event.getEventType() + " was performed by " + event.getComponentType());
return ref;
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
index ba53bf029b..d467da9785 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
@@ -90,11 +90,11 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
protected void addDataSetRefs(NiFiFlow nifiFlow, Set flowPaths, DataSetRefs refs) {
// create reference to NiFi flow path.
final Referenceable flowRef = toReferenceable(nifiFlow);
- final String clusterName = nifiFlow.getClusterName();
+ final String namespace = nifiFlow.getNamespace();
final String url = nifiFlow.getUrl();
for (NiFiFlowPath flowPath : flowPaths) {
- final Referenceable flowPathRef = toReferenceable(flowPath, flowRef, clusterName, url);
+ final Referenceable flowPathRef = toReferenceable(flowPath, flowRef, namespace, url);
addDataSetRefs(refs, flowPathRef);
}
}
@@ -109,13 +109,13 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
protected Referenceable toReferenceable(NiFiFlowPath flowPath, NiFiFlow nifiFlow) {
return toReferenceable(flowPath, toReferenceable(nifiFlow),
- nifiFlow.getClusterName(), nifiFlow.getUrl());
+ nifiFlow.getNamespace(), nifiFlow.getUrl());
}
- private Referenceable toReferenceable(NiFiFlowPath flowPath, Referenceable flowRef, String clusterName, String nifiUrl) {
+ private Referenceable toReferenceable(NiFiFlowPath flowPath, Referenceable flowRef, String namespace, String nifiUrl) {
final Referenceable flowPathRef = new Referenceable(TYPE_NIFI_FLOW_PATH);
flowPathRef.set(ATTR_NAME, flowPath.getName());
- flowPathRef.set(ATTR_QUALIFIED_NAME, flowPath.getId() + "@" + clusterName);
+ flowPathRef.set(ATTR_QUALIFIED_NAME, flowPath.getId() + "@" + namespace);
flowPathRef.set(ATTR_NIFI_FLOW, flowRef);
flowPathRef.set(ATTR_URL, flowPath.createDeepLinkURL(nifiUrl));
// Referenceable has to have GUID assigned, otherwise it will not be stored due to lack of required attribute.
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
index 03c9eaaaa4..1eb4e35cb8 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
@@ -256,7 +256,7 @@ public class CompleteFlowPathLineage extends AbstractLineageStrategy {
// In order to differentiate a queue between parents and this flow_path, add the hash into the queue qname.
// E.g, FF1 and FF2 read from dirA were merged, vs FF3 and FF4 read from dirB were merged then passed here, these two should be different queue.
if (queueBetweenParent != null) {
- queueBetweenParent.set(ATTR_QUALIFIED_NAME, toQualifiedName(nifiFlow.getClusterName(), firstComponentId + "::" + hash));
+ queueBetweenParent.set(ATTR_QUALIFIED_NAME, toQualifiedName(nifiFlow.getNamespace(), firstComponentId + "::" + hash));
}
// If the same components emitted multiple provenance events consecutively, merge it to come up with a simpler name.
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 1560c7d022..0bfc7876ea 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -40,9 +40,9 @@ import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
-import org.apache.nifi.atlas.resolver.ClusterResolver;
-import org.apache.nifi.atlas.resolver.ClusterResolvers;
-import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.resolver.NamespaceResolver;
+import org.apache.nifi.atlas.resolver.NamespaceResolvers;
+import org.apache.nifi.atlas.resolver.RegexNamespaceResolver;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.atlas.security.Basic;
import org.apache.nifi.atlas.security.Kerberos;
@@ -66,6 +66,7 @@ import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.StringSelector;
import java.io.File;
import java.io.FileInputStream;
@@ -89,6 +90,7 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@@ -103,20 +105,22 @@ import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.
" in addition to NiFi provenance events providing detailed event level lineage." +
" See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
-@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
- description = RegexClusterResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
+ description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {
+ private static final String ATLAS_URL_DELIMITER = ",";
static final PropertyDescriptor ATLAS_URLS = new PropertyDescriptor.Builder()
.name("atlas-urls")
.displayName("Atlas URLs")
.description("Comma separated URL of Atlas Servers" +
" (e.g. http://atlas-server-hostname:21000 or https://atlas-server-hostname:21443)." +
" For accessing Atlas behind Knox gateway, specify Knox gateway URL" +
- " (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas).")
- .required(true)
+ " (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas)." +
+ " If not specified, 'atlas.rest.address' in Atlas Configuration File is used.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@@ -192,11 +196,11 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
public static final PropertyDescriptor ATLAS_DEFAULT_CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("atlas-default-cluster-name")
- .displayName("Atlas Default Cluster Name")
- .description("Cluster name for Atlas entities reported by this ReportingTask." +
- " If not specified, 'atlas.cluster.name' in Atlas Configuration File is used." +
- " Cluster name mappings can be configured by user defined properties." +
- " See additional detail for detail.")
+ .displayName("Atlas Default Metadata Namespace")
+ .description("Namespace for Atlas entities reported by this ReportingTask." +
+ " If not specified, 'atlas.metadata.namespace' or 'atlas.cluster.name' (the former having priority) in Atlas Configuration File is used." +
+ " Multiple mappings can be configured by user defined properties." +
+ " See 'Additional Details...' for more.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -214,10 +218,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue("false")
.build();
- static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor KAFKA_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
- .displayName("SSL Context Service")
- .description("Specifies the SSL Context Service to use for communicating with Atlas and Kafka.")
+ .displayName("Kafka SSL Context Service")
+ .description("Specifies the SSL Context Service to use for communicating with Kafka.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@@ -249,9 +253,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue(SEC_PLAINTEXT.getValue())
.build();
- public static final PropertyDescriptor NIFI_KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("nifi-kerberos-principal")
- .displayName("NiFi Kerberos Principal")
+ .displayName("Kerberos Principal")
.description("The Kerberos principal for this NiFi instance to access Atlas API and Kafka brokers." +
" If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file." +
" This principal will be set into 'sasl.jaas.config' Kafka's property.")
@@ -259,9 +263,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- public static final PropertyDescriptor NIFI_KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
.name("nifi-kerberos-keytab")
- .displayName("NiFi Kerberos Keytab")
+ .displayName("Kerberos Keytab")
.description("The Kerberos keytab for this NiFi instance to access Atlas API and Kafka brokers." +
" If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file." +
" This principal will be set into 'sasl.jaas.config' Kafka's property.")
@@ -298,9 +302,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
"Create separate 'nifi_flow_path' Atlas Processes for each distinct input and output DataSet combinations" +
" by looking at the complete route for a given FlowFile. See also 'Additional Details.");
- static final PropertyDescriptor NIFI_LINEAGE_STRATEGY = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor LINEAGE_STRATEGY = new PropertyDescriptor.Builder()
.name("nifi-lineage-strategy")
- .displayName("NiFi Lineage Strategy")
+ .displayName("Lineage Strategy")
.description("Specifies granularity on how NiFi data flow should be reported to Atlas." +
" NOTE: It is strongly recommended to keep using the same strategy once this reporting task started to keep Atlas data clean." +
" Switching strategies will not delete Atlas entities created by the old strategy." +
@@ -314,55 +318,61 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
+ private static final String ATLAS_PROPERTY_METADATA_NAMESPACE = "atlas.metadata.namespace";
private static final String ATLAS_PROPERTY_CLUSTER_NAME = "atlas.cluster.name";
+ private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers";
private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
- private final ServiceLoader clusterResolverLoader = ServiceLoader.load(ClusterResolver.class);
+ private final ServiceLoader namespaceResolverLoader = ServiceLoader.load(NamespaceResolver.class);
private volatile AtlasAuthN atlasAuthN;
private volatile Properties atlasProperties;
private volatile boolean isTypeDefCreated = false;
- private volatile String defaultClusterName;
+ private volatile String defaultMetadataNamespace;
private volatile ProvenanceEventConsumer consumer;
- private volatile ClusterResolvers clusterResolvers;
+ private volatile NamespaceResolvers namespaceResolvers;
private volatile NiFiAtlasHook nifiAtlasHook;
private volatile LineageStrategy lineageStrategy;
@Override
protected List getSupportedPropertyDescriptors() {
final List properties = new ArrayList<>();
+ // Basic atlas config
properties.add(ATLAS_URLS);
- properties.add(ATLAS_CONNECT_TIMEOUT);
- properties.add(ATLAS_READ_TIMEOUT);
+ properties.add(ATLAS_CONF_DIR);
+ properties.add(ATLAS_CONF_CREATE);
+ properties.add(ATLAS_DEFAULT_CLUSTER_NAME);
+
+ // General config used by the processor
+ properties.add(LINEAGE_STRATEGY);
+ properties.add(PROVENANCE_START_POSITION);
+ properties.add(PROVENANCE_BATCH_SIZE);
+ properties.add(ATLAS_NIFI_URL);
properties.add(ATLAS_AUTHN_METHOD);
properties.add(ATLAS_USER);
properties.add(ATLAS_PASSWORD);
- properties.add(ATLAS_CONF_DIR);
- properties.add(ATLAS_NIFI_URL);
- properties.add(ATLAS_DEFAULT_CLUSTER_NAME);
- properties.add(NIFI_LINEAGE_STRATEGY);
- properties.add(PROVENANCE_START_POSITION);
- properties.add(PROVENANCE_BATCH_SIZE);
- properties.add(SSL_CONTEXT_SERVICE);
// Following properties are required if ATLAS_CONF_CREATE is enabled.
// Otherwise should be left blank.
- properties.add(ATLAS_CONF_CREATE);
+ // Will be used by the atlas client by reading the values from the atlas config file
properties.add(KERBEROS_CREDENTIALS_SERVICE);
- properties.add(NIFI_KERBEROS_PRINCIPAL);
- properties.add(NIFI_KERBEROS_KEYTAB);
- properties.add(KAFKA_KERBEROS_SERVICE_NAME);
+ properties.add(KERBEROS_PRINCIPAL);
+ properties.add(KERBEROS_KEYTAB);
properties.add(KAFKA_BOOTSTRAP_SERVERS);
properties.add(KAFKA_SECURITY_PROTOCOL);
+ properties.add(KAFKA_KERBEROS_SERVICE_NAME);
+ properties.add(KAFKA_SSL_CONTEXT_SERVICE);
+ properties.add(ATLAS_CONNECT_TIMEOUT);
+ properties.add(ATLAS_READ_TIMEOUT);
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
- for (ClusterResolver resolver : clusterResolverLoader) {
+ for (NamespaceResolver resolver : namespaceResolverLoader) {
final PropertyDescriptor propertyDescriptor = resolver.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
if(propertyDescriptor != null) {
return propertyDescriptor;
@@ -371,42 +381,35 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
return null;
}
- private void parseAtlasUrls(final PropertyValue atlasUrlsProp, final Consumer urlStrConsumer) {
- final String atlasUrlsStr = atlasUrlsProp.evaluateAttributeExpressions().getValue();
- if (atlasUrlsStr != null && !atlasUrlsStr.isEmpty()) {
- Arrays.stream(atlasUrlsStr.split(","))
- .map(String::trim)
- .forEach(urlStrConsumer);
- }
- }
-
@Override
protected Collection customValidate(ValidationContext context) {
final Collection results = new ArrayList<>();
- final boolean isSSLContextServiceSet = context.getProperty(SSL_CONTEXT_SERVICE).isSet();
+ final boolean isSSLContextServiceSet = context.getProperty(KAFKA_SSL_CONTEXT_SERVICE).isSet();
final ValidationResult.Builder invalidSSLService = new ValidationResult.Builder()
- .subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
- parseAtlasUrls(context.getProperty(ATLAS_URLS), input -> {
- final ValidationResult.Builder builder = new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input);
- try {
- final URL url = new URL(input);
- if ("https".equalsIgnoreCase(url.getProtocol()) && !isSSLContextServiceSet) {
- results.add(invalidSSLService.explanation("required by HTTPS Atlas access").build());
- } else {
- results.add(builder.explanation("Valid URI").valid(true).build());
- }
- } catch (Exception e) {
- results.add(builder.explanation("Contains invalid URI: " + e).valid(false).build());
- }
- });
+ .subject(KAFKA_SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
+
+ String atlasUrls = context.getProperty(ATLAS_URLS).evaluateAttributeExpressions().getValue();
+ if (!StringUtils.isEmpty(atlasUrls)) {
+ Arrays.stream(atlasUrls.split(ATLAS_URL_DELIMITER))
+ .map(String::trim)
+ .forEach(input -> {
+ final ValidationResult.Builder builder = new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input);
+ try {
+ new URL(input);
+ results.add(builder.explanation("Valid URI").valid(true).build());
+ } catch (Exception e) {
+ results.add(builder.explanation("Contains invalid URI: " + e).valid(false).build());
+ }
+ });
+ }
final String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
results.addAll(atlasAuthN.validate(context));
- clusterResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
+ namespaceResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
if (context.getProperty(ATLAS_CONF_CREATE).asBoolean()) {
@@ -430,8 +433,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
results.add(invalidSSLService.explanation("required by SSL Kafka connection").build());
}
- final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
- final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+ final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
@@ -469,13 +472,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// initAtlasClient has to be done first as it loads AtlasProperty.
initAtlasProperties(context);
initLineageStrategy(context);
- initClusterResolvers(context);
+ initNamespaceResolvers(context);
}
private void initLineageStrategy(ConfigurationContext context) throws IOException {
nifiAtlasHook = new NiFiAtlasHook();
- final String strategy = context.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
+ final String strategy = context.getProperty(LINEAGE_STRATEGY).getValue();
if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) {
lineageStrategy = new SimpleFlowPathLineage();
} else if (LINEAGE_STRATEGY_COMPLETE_PATH.equals(strategy)) {
@@ -486,20 +489,17 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
initProvenanceConsumer(context);
}
- private void initClusterResolvers(ConfigurationContext context) {
- final Set loadedClusterResolvers = new LinkedHashSet<>();
- clusterResolverLoader.forEach(resolver -> {
+ private void initNamespaceResolvers(ConfigurationContext context) {
+ final Set loadedNamespaceResolvers = new LinkedHashSet<>();
+ namespaceResolverLoader.forEach(resolver -> {
resolver.configure(context);
- loadedClusterResolvers.add(resolver);
+ loadedNamespaceResolvers.add(resolver);
});
- clusterResolvers = new ClusterResolvers(Collections.unmodifiableSet(loadedClusterResolvers), defaultClusterName);
+ namespaceResolvers = new NamespaceResolvers(Collections.unmodifiableSet(loadedNamespaceResolvers), defaultMetadataNamespace);
}
private void initAtlasProperties(ConfigurationContext context) throws IOException {
- List urls = new ArrayList<>();
- parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
- final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
final String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
final String confDirStr = context.getProperty(ATLAS_CONF_DIR).evaluateAttributeExpressions().getValue();
@@ -532,17 +532,18 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
}
- // Resolve default cluster name.
- defaultClusterName = context.getProperty(ATLAS_DEFAULT_CLUSTER_NAME).evaluateAttributeExpressions().getValue();
- if (defaultClusterName == null || defaultClusterName.isEmpty()) {
- // If default cluster name is not specified by processor configuration, then load it from Atlas config.
- defaultClusterName = atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME);
- }
+ List urls = parseAtlasUrls(context.getProperty(ATLAS_URLS));
+ final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
- // If default cluster name is still not defined, processor should not be able to start.
- if (defaultClusterName == null || defaultClusterName.isEmpty()) {
- throw new ProcessException("Default cluster name is not defined.");
- }
+ setValue(
+ value -> defaultMetadataNamespace = value,
+ () -> {
+ throw new ProcessException("Default metadata namespace (or cluster name) is not defined.");
+ },
+ context.getProperty(ATLAS_DEFAULT_CLUSTER_NAME),
+ atlasProperties.getProperty(ATLAS_PROPERTY_METADATA_NAMESPACE),
+ atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME)
+ );
String atlasConnectTimeoutMs = context.getProperty(ATLAS_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue() + "";
String atlasReadTimeoutMs = context.getProperty(ATLAS_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue() + "";
@@ -555,9 +556,11 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// enforce synchronous notification sending (needed for the checkpointing in ProvenanceEventConsumer)
atlasProperties.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
+ atlasProperties.put(ATLAS_PROPERTY_REST_ADDRESS, urls.stream().collect(Collectors.joining(ATLAS_URL_DELIMITER)));
atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, atlasConnectTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
- atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, defaultClusterName);
+ atlasProperties.put(ATLAS_PROPERTY_METADATA_NAMESPACE, defaultMetadataNamespace);
+ atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, defaultMetadataNamespace);
atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(isAtlasApiSecure));
setKafkaConfig(atlasProperties, context);
@@ -585,19 +588,63 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
if (confDir != null) {
// If atlasConfDir is not set, atlas-application.properties will be searched under classpath.
Properties props = System.getProperties();
- final String atlasConfProp = "atlas.conf";
+ final String atlasConfProp = ApplicationProperties.ATLAS_CONFIGURATION_DIRECTORY_PROPERTY;
props.setProperty(atlasConfProp, confDir.getAbsolutePath());
getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)});
}
}
+ private List parseAtlasUrls(final PropertyValue atlasUrlsProp) {
+ List atlasUrls = new ArrayList<>();
+
+ setValue(
+ value -> Arrays.stream(value.split(ATLAS_URL_DELIMITER))
+ .map(String::trim)
+ .forEach(urlString -> {
+ try {
+ new URL(urlString);
+ } catch (Exception e) {
+ throw new ProcessException(e);
+ }
+ atlasUrls.add(urlString);
+ }
+ ),
+ () -> {
+ throw new ProcessException("No Atlas URL has been specified! Set either the '" + ATLAS_URLS.getDisplayName() + "' " +
+ "property on the processor or the 'atlas.rest.address' property in the atlas configuration file.");
+ },
+ atlasUrlsProp,
+ atlasProperties.getProperty(ATLAS_PROPERTY_REST_ADDRESS)
+ );
+
+ return atlasUrls;
+ }
+
+ private void setValue(Consumer setter, Runnable emptyHandler, PropertyValue elEnabledPropertyValue, String... properties) {
+ StringSelector valueSelector = StringSelector
+ .of(elEnabledPropertyValue.evaluateAttributeExpressions().getValue())
+ .or(properties);
+
+ if (valueSelector.found()) {
+ setter.accept(valueSelector.toString());
+ } else {
+ emptyHandler.run();
+ }
+ }
+
+ private void checkAtlasUrls(List urlStrings, ConfigurationContext context) {
+ if (urlStrings.isEmpty()) {
+ throw new ProcessException("No Atlas URL has been specified! Set either the '" + ATLAS_URLS.getDisplayName() + "' " +
+ "property on the processor or the 'atlas.rest.address' porperty in the atlas configuration file.");
+ }
+ }
+
/**
* In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration),
* create Atlas client instance at every onTrigger execution.
*/
protected NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
- List urls = new ArrayList<>();
- parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
+ List urls = parseAtlasUrls(context.getProperty(ATLAS_URLS));
try {
return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{})));
} catch (final NullPointerException e) {
@@ -709,10 +756,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();
- final String clusterName;
+ final String namespace;
try {
final String nifiHostName = new URL(nifiUrl).getHost();
- clusterName = clusterResolvers.fromHostNames(nifiHostName);
+ namespace = namespaceResolvers.fromHostNames(nifiHostName);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to parse NiFi URL, " + e.getMessage(), e);
}
@@ -720,10 +767,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
NiFiFlow existingNiFiFlow = null;
try {
// Retrieve Existing NiFiFlow from Atlas.
- existingNiFiFlow = atlasClient.fetchNiFiFlow(rootProcessGroup.getId(), clusterName);
+ existingNiFiFlow = atlasClient.fetchNiFiFlow(rootProcessGroup.getId(), namespace);
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())){
- getLogger().debug("Existing flow was not found for {}@{}", new Object[]{rootProcessGroup.getId(), clusterName});
+ getLogger().debug("Existing flow was not found for {}@{}", new Object[]{rootProcessGroup.getId(), namespace});
} else {
throw new RuntimeException("Failed to fetch existing NiFI flow. " + e, e);
}
@@ -732,7 +779,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
final NiFiFlow nifiFlow = existingNiFiFlow != null ? existingNiFiFlow : new NiFiFlow(rootProcessGroup.getId());
nifiFlow.setFlowName(flowName);
nifiFlow.setUrl(nifiUrl);
- nifiFlow.setClusterName(clusterName);
+ nifiFlow.setNamespace(namespace);
final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
@@ -744,7 +791,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
final EventAccess eventAccess = context.getEventAccess();
- final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
+ final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers,
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
(ProvenanceRepository)eventAccess.getProvenanceRepository());
consumer.consumeEvents(context, (componentMapHolder, events) -> {
@@ -770,7 +817,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
mapToPopulate.put(ATLAS_KAFKA_PREFIX + "security.protocol", kafkaSecurityProtocol);
// Translate SSLContext Service configuration into Kafka properties
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final SSLContextService sslContextService = context.getProperty(KAFKA_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
mapToPopulate.put(ATLAS_KAFKA_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
mapToPopulate.put(ATLAS_KAFKA_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
@@ -802,8 +849,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private void setKafkaJaasConfig(Map
@@ -121,40 +121,41 @@
-
Cluster Name Resolution
+
Namespaces
An entity in Atlas can be identified by its GUID for any existing objects, or type name and unique attribute can be used if GUID is not known. Qualified name is commonly used as the unique attribute.
-
Since one Atlas instance can be used to manage multiple environments, i.e clusters, Atlas has to manage objects in different clusters those may have the same name. For example, a Hive table 'request_logs' in a 'cluster-A' and 'cluster-B'. In such case, cluster name embedded in qualified names are crucial.
+
One Atlas instance can be used to manage multiple environments and objects in different environments may have the same name. For example, a Hive table 'request_logs' in two different clusters, 'cluster-A' and 'cluster-B'. For this reason the qualified names contain a so-called metadata namespace.
+
It's common practice to provide the cluster name as the namespace, but it can be any arbitrary string.
-
For these requirements, a qualified name has 'componentId@clusterName' format. E.g. A Hive table qualified name would be dbName.tableName@clusterName (default.request_logs@cluster-A).
+
With this, a qualified name has 'componentId@namespace' format. E.g. A Hive table qualified name would be dbName.tableName@namespace (default.request_logs@cluster-A).
-
From this NiFi reporting task standpoint, a cluster name is need to be resolved at following situations:
+
From this NiFi reporting task standpoint, a namespace is needed to be resolved at following situations:
-
To register NiFi component entities. Which cluster name should be used to represent the current NiFi cluster?
-
To create lineages from NiFi component to other DataSets. Which cluster does the DataSet resides?
+
To register NiFi component entities. Which namespace should be used to represent the current NiFi environment?
+
To create lineages from NiFi component to other DataSets. Which environment does the DataSet resides in?
-
To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from ip address or hostname to a cluster name.
- The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.ClusterName' format, having its value as a set of Regular Expression Patterns to match ip addresses or host names to a particular cluster name.
+
To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from IP address or hostname to a namespace.
+ The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.namespace' format, having its value as a set of Regular Expression Patterns to match IP addresses or host names to a particular namespace.
-
As an example, following mapping definition would resolve cluster name 'cluster-A' for ip address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'cluster-B' for '192.168.40.223' or 'nifi3.b.example.com'.
+
As an example, following mapping definition would resolve namespace 'namespace-A' for IP address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'namespace-B' for '192.168.40.223' or 'nifi3.b.example.com'.
-# Dynamic Property Name for cluster-A
-hostnamePattern.cluster-A
+# Dynamic Property Name for namespace-A
+hostnamePattern.namespace-A
# Value can have multiple Regular Expression patterns separated by new line
192\.168\.30\.\d+
[^\.]+\.a\.example\.com
-# Dynamic Property Name for cluster-B
-hostnamePattern.cluster-B
+# Dynamic Property Name for namespace-B
+hostnamePattern.namespace-B
# Values
192\.168\.40\.\d+
[^\.]+\.b\.example\.com
-
If any cluster name mapping does not match, then a name defined at 'Atlas Default Cluster Name' is used.
+
If no namespace mapping matches, then a name defined at 'Atlas Default Metadata Namespace' is used.
NiFi flow structure
@@ -271,11 +272,11 @@ Processor 3
To identify such Process and DataSet Atlas entities, this reporting task uses NiFi Provenance Events. At least, the reporting task needs to derive following information from a NiFi Provenance event record:
typeName (e.g. fs_path, hive_table)
-
qualifiedName in uniqueId@clusterName (e.g. /data/A1.csv@BranchOffice1)
+
qualifiedName in uniqueId@namespace (e.g. /data/A1.csv@ns1)
-
'clusterName' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a cluster name. See Cluster Name Resolution for detail.
+
'namespace' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a namespace. See Namespaces for detail.
For 'typeName' and 'qualifiedName', different analysis rules are needed for different DataSet. ReportLineageToAtlas provides an extension point called 'NiFiProvenanceEventAnalyzer' to implement such analysis logic for particular DataSets.
NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.
For 'nifi_flow_path': remoteOutputPortGUID@clusterName (e.g. 7375f8f6-4604-468d-144c-a8d71255027d@cl1)
+
For 'nifi_flow_path': remoteOutputPortGUID@namespace (e.g. 7375f8f6-4604-468d-144c-a8d71255027d@ns1)
NOTE: The remoteOutputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Output Ports can pull from the same target remote output port.
-
For 'nifi_queue': downstreamPathGUID@clusterName (e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)
+
For 'nifi_queue': downstreamPathGUID@namespace (e.g. bb530e58-ee14-3cac-144c-a8d71255027d@ns1)
@@ -409,7 +410,7 @@ remote target port
nifi_input_port
nifi_output_port
-
@@ -435,7 +436,7 @@ remote target port
(Protocol can be either PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL)
kafka_topic
-
topicName@clusterName (e.g. testTopic@cl1)
+
topicName@namespace (e.g. testTopic@ns1)
NOTE:With Atlas earlier than 0.8.2, the same topic name in different clusters can not be created using the pre-built 'kafka_topic'. See ATLAS-2286.
@@ -444,7 +445,7 @@ remote target port
SEND
thrift://hive.example.com:9083
hive_table
-
tableName@clusterName (e.g. myTable@cl1)
+
tableName@namespace (e.g. myTable@ns1)
@@ -460,7 +461,7 @@ remote target port
jdbc:hive2://hive.example.com:10000/default
hive_table
-
tableName@clusterName (e.g. myTable@cl1)
+
tableName@namespace (e.g. myTable@ns1)
The corresponding Processors parse Hive QL to set 'query.input.tables' and 'query.output.tables' FlowFile attributes. These attribute values are used to create qualified name.